Unlock a world of possibilities! Login now and discover the exclusive benefits awaiting you.
Good morning all
I am hoping for some help with a script we have in use for incremental loads. It has been written by an external party and we've just discovered it is not doing what it should. I'm nervous of making any changes without advice
When a record is deleted or updated, the record/previous version of the record is pushed to a deleted records table. The script should recognise deleted and updated records and should remove or update them as appropriate.
We have found however, that the updated records are being excluded because they are found in the deleted table.
I've pasted the part of the script that deals with is below. Logic tells me that we should be looking for the deleted records first and then looking for the updated and new records, rather than the other way round...(?)
Any assistance would be greatly appreciated. Thanks in advance!
Rebekah
Script:
Extract – 1: Load – Incremental
IF '$(vLoadMode)' = 'incremental' THEN
TRACE ######################################;
TRACE #### Incremental Load ####;
TRACE ######################################;
MaxIndexTable:
LOAD maxIndex, TableName FROM $(vOutputPath)QVDLoadSummary\QVD_Load_Summary_$(vWeigth)_$(vTableType)_$(vReloadPrevLoadTime).qvd (qvd);
FOR i = 0 TO $(vNoOfTables) - 1
LET vReloadStartTime = num(now());
LET vDatabasePrefix = peek('DatabasePrefix', $(i), 'LoadControlTable');
LET vDatabaseName = peek('DatabaseName', $(i), 'LoadControlTable');
LET vSchema = peek('Schema', $(i), 'LoadControlTable');
LET vTableName = peek('TableName', $(i), 'LoadControlTable');
LET vTablePrefix = peek('TablePrefix', $(i), 'LoadControlTable');
LET vDelRecTablePrefix = 'del_iwms_';
LET vFields = peek('Fields', $(i), 'LoadControlTable');
LET vPrimaryKey = peek('PrimaryKey', $(i), 'LoadControlTable');
LET vDate = peek('DateField', $(i), 'LoadControlTable');
IF len('$(vDate)') > 0 THEN
LET vWhereClouse = 'WHERE ' & '$(vDate)' & ' >= ' & chr(39) & '$(vSplitDate)' & chr(39);
ENDIF
TRACE ;
TRACE #### $(vDatabaseName) - $(vTableName) ####;
// Loading previous summary table to get max latest index for the current table
TRACE -- MaxIndexTable;
NoConcatenate
MaxIndexTmp:
LOAD maxIndex RESIDENT MaxIndexTable
WHERE TableName = '$(vDatabaseName)_$(vTableName)';
LET vMaxIndex = peek('maxIndex', 0,'MaxIndexTmp');
DROP Table MaxIndexTmp;
//=====================================================================
// if index is empty then start form 0 (take all records)
IF len('$(vMaxIndex)') = 0 THEN
LET vMaxIndex = 0;
ENDIF
//=====================================================================
// control number of records from the db
Concatenate(db_records)
LOAD
'$(vDatabaseName)' & '_' & '$(vTableName)' as TableName,
Records as dbRecords;
SQL SELECT count(*) as Records FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vTablePrefix)$(vTableName)
$(vWhereClouse);
//====================================================================================================================
// Loading new and updated records.
TRACE -- New Records;
$(vDatabaseName)_$(vTableName):
SQL SELECT '$(vDatabaseName)' as DB,
'$(vDatabaseName)' +'|'+ cast($(vPrimaryKey) as varchar) as _PrimaryKey,
$(vFields),
cast(rowversion as bigint) as _RowVersion
FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vTablePrefix)$(vTableName)
WHERE cast(rowversion as bigint) > $(vMaxIndex);
LET vNewRecordsCount = NoOfRows('$(vDatabaseName)_$(vTableName)');
//====================================================================================================================
// Deleted records
TRACE -- Deleted Records;
// All tables have to have the _RecordDeletedFlag field because in etl is using that field
Concatenate($(vDatabaseName)_$(vTableName))
LOAD * INLINE [ _RecordDeletedFlag ];
//============================================================================================================
// Loading records from "deleted records" table to flag deleted records.
Concatenate($(vDatabaseName)_$(vTableName))
SQL SELECT
'$(vDatabaseName)' +'|'+ cast($(vPrimaryKey) as varchar) as _PrimaryKey,
1 as _RecordDeletedFlag
FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vDelRecTablePrefix)$(vTableName);
LET vNoOfNewDeletedReocrds = NoOfRows('$(vDatabaseName)_$(vTableName)');
TRACE -- No Of New & Reocrds: $(vNoOfNewDeletedReocrds);
// Loading stored data from the qvd, filtered by the primary key to exclude updated records loaded from the qvd to memory
TRACE -- QVD Records;
IF $(vNoOfNewDeletedReocrds) > 0 THEN
Concatenate($(vDatabaseName)_$(vTableName))
LOAD *
FROM $(vOutputPath)$(vDatabaseName)_$(vTableName).qvd (qvd)
WHERE NOT EXISTS (_PrimaryKey);
// saving current max index for the rowversion field
Concatenate(max_index)
LOAD '$(vDatabaseName)' & '_' & '$(vTableName)' as TableName,
_RowVersion as maxIndex;
SQL SELECT cast(max(rowversion) as bigint) as _RowVersion FROM $(vDatabasePrefix)$(vDatabaseName).$(vSchema).$(vTablePrefix)$(vTableName);
// Store and load summary
STORE * FROM $(vDatabaseName)_$(vTableName) INTO '$(vOutputPath)$(vDatabaseName)_$(vTableName).qvd' (qvd);
ELSE
Concatenate(max_index)
LOAD '$(vDatabaseName)' & '_' & '$(vTableName)' as TableName,
$(vMaxIndex) as maxIndex
AUTOGENERATE (1);
Concatenate($(vDatabaseName)_$(vTableName))
LOAD _PrimaryKey
FROM $(vOutputPath)$(vDatabaseName)_$(vTableName).qvd (qvd);
ENDIF
LET vReloadEndTime = num(now());
CALL QVDSUmmary('$(vDatabaseName)_$(vTableName)', '$(vOutputPath)$(vDatabaseName)_$(vTableName).qvd');
DROP Table $(vDatabaseName)_$(vTableName);
// Table Reload Time
LET vReloadTimeInterval = interval($(vReloadEndTime) - $(vReloadStartTime), 'hh:mm:ss');
LET vDate = null();
LET vWhereClouse = null();
NEXT i
DROP Table MaxIndexTable;
ENDIF