Skip to main content
Announcements
Introducing Qlik Answers: A plug-and-play, Generative AI powered RAG solution. READ ALL ABOUT IT!
cancel
Showing results for 
Search instead for 
Did you mean: 
hartleyr
Contributor III
Contributor III

Incremental Loads - Script not doing what it should...

Good morning all

I am hoping for some help with a script we have in use for incremental loads.  It has been written by an external party and we've just discovered it is not doing what it should. I'm nervous of making any changes without advice

When a record is deleted or updated, the record/previous version of the record is pushed to a deleted records table.  The script should recognise deleted and updated records and should remove or update them as appropriate.

We have found however, that the updated records are being excluded because they are found in the deleted table. 

I've pasted the part of the script that deals with is below.  Logic tells me that we should be looking for the deleted records first and then looking for the updated and new records, rather than the other way round...(?)

Any assistance would be greatly appreciated.  Thanks in advance!

Rebekah

Script:

Extract – 1: Load – Incremental

 

IF '$(vLoadMode)' = 'incremental' THEN

     TRACE ######################################;
     TRACE ####      Incremental Load          ####;
     TRACE ######################################;

     MaxIndexTable:
     LOAD maxIndex, TableName FROM $(vOutputPath)QVDLoadSummary\QVD_Load_Summary_$(vWeigth)_$(vTableType)_$(vReloadPrevLoadTime).qvd (qvd);

     FOR i = 0 TO $(vNoOfTables) - 1
    
           LET vReloadStartTime = num(now());
          
                LET vDatabasePrefix = peek('DatabasePrefix', $(i), 'LoadControlTable'); 
                LET vDatabaseName = peek('DatabaseName', $(i), 'LoadControlTable');
                LET vSchema = peek('Schema', $(i), 'LoadControlTable');
                LET vTableName = peek('TableName', $(i), 'LoadControlTable');
                LET vTablePrefix = peek('TablePrefix', $(i), 'LoadControlTable');
                LET vDelRecTablePrefix = 'del_iwms_';
                LET vFields = peek('Fields', $(i), 'LoadControlTable');
                LET vPrimaryKey = peek('PrimaryKey', $(i), 'LoadControlTable');
                LET vDate = peek('DateField', $(i), 'LoadControlTable');
               
                IF len('$(vDate)') > 0 THEN
                     LET vWhereClouse = 'WHERE ' & '$(vDate)' & ' >= ' & chr(39) & '$(vSplitDate)' & chr(39);
                ENDIF
    
                TRACE ;
                TRACE #### $(vDatabaseName) - $(vTableName) ####;
          
                // Loading previous summary table to get max latest index for the current table
                TRACE -- MaxIndexTable;
                           NoConcatenate
                           MaxIndexTmp:
                           LOAD maxIndex RESIDENT MaxIndexTable
                                WHERE TableName = '$(vDatabaseName)_$(vTableName)';
                                LET vMaxIndex = peek('maxIndex', 0,'MaxIndexTmp');
                           DROP Table MaxIndexTmp;
                          
                           //=====================================================================
                           // if index is empty then start form 0 (take all records)
                           IF len('$(vMaxIndex)') = 0  THEN
                                LET vMaxIndex = 0;
                           ENDIF
          
                           //=====================================================================
                           // control number of records from the db
                           Concatenate(db_records)
                           LOAD
                                '$(vDatabaseName)' & '_' & '$(vTableName)' as TableName,
                                Records as dbRecords;
                           SQL SELECT count(*) as Records FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vTablePrefix)$(vTableName)
                           $(vWhereClouse);
                          
           //====================================================================================================================          
                // Loading new and updated records.
                TRACE -- New Records;
                          
                           $(vDatabaseName)_$(vTableName):
                           SQL SELECT '$(vDatabaseName)' as DB,
                                '$(vDatabaseName)' +'|'+ cast($(vPrimaryKey) as varchar) as _PrimaryKey,
                                $(vFields),
                                cast(rowversion as bigint) as _RowVersion
                           FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vTablePrefix)$(vTableName)
                                WHERE cast(rowversion as bigint)  > $(vMaxIndex);
                          
                LET vNewRecordsCount = NoOfRows('$(vDatabaseName)_$(vTableName)');
               
           //====================================================================================================================                          
                //   Deleted records     
                TRACE -- Deleted Records;
               
                           // All tables have to have the _RecordDeletedFlag field because in etl is using that field
                           Concatenate($(vDatabaseName)_$(vTableName))
                           LOAD * INLINE [ _RecordDeletedFlag ];
                          
                     //============================================================================================================
                           // Loading records from "deleted records" table to flag deleted records.
                           Concatenate($(vDatabaseName)_$(vTableName))
                           SQL SELECT
                                '$(vDatabaseName)' +'|'+ cast($(vPrimaryKey) as varchar) as _PrimaryKey,
                                1 as _RecordDeletedFlag
                           FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vDelRecTablePrefix)$(vTableName);
                               
                LET vNoOfNewDeletedReocrds = NoOfRows('$(vDatabaseName)_$(vTableName)');
                TRACE -- No Of New & Reocrds: $(vNoOfNewDeletedReocrds);
                          
                // Loading stored data from the qvd, filtered by the primary key to exclude updated records loaded from the qvd to memory
                TRACE -- QVD Records;
                     IF $(vNoOfNewDeletedReocrds) > 0 THEN
                           Concatenate($(vDatabaseName)_$(vTableName))
                           LOAD *
                           FROM  $(vOutputPath)$(vDatabaseName)_$(vTableName).qvd (qvd)
                           WHERE NOT EXISTS (_PrimaryKey);
                          
                     // saving current max index for the rowversion field
                           Concatenate(max_index)
                           LOAD '$(vDatabaseName)' & '_' & '$(vTableName)' as TableName,
                                     _RowVersion as maxIndex;
                           SQL SELECT cast(max(rowversion) as bigint) as _RowVersion FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vTablePrefix)$(vTableName);
                          
                     // Store and load summary
                           STORE * FROM $(vDatabaseName)_$(vTableName) INTO '$(vOutputPath)$(vDatabaseName)_$(vTableName).qvd' (qvd);
                     ELSE
                           Concatenate(max_index)
                           LOAD '$(vDatabaseName)' & '_' & '$(vTableName)' as TableName,
                                      $(vMaxIndex) as maxIndex
                           AUTOGENERATE (1);
                          
                           Concatenate($(vDatabaseName)_$(vTableName))
                           LOAD _PrimaryKey
                           FROM  $(vOutputPath)$(vDatabaseName)_$(vTableName).qvd (qvd);
                     ENDIF
               
           LET vReloadEndTime = num(now());
           CALL QVDSUmmary('$(vDatabaseName)_$(vTableName)', '$(vOutputPath)$(vDatabaseName)_$(vTableName).qvd');
               
           DROP Table $(vDatabaseName)_$(vTableName);
          
           // Table Reload Time
           LET vReloadTimeInterval = interval($(vReloadEndTime) - $(vReloadStartTime), 'hh:mm:ss');
          
           LET vDate = null();
           LET vWhereClouse = null();
    
     NEXT i
    
     DROP Table MaxIndexTable;
    
ENDIF

 

Labels (4)
0 Replies