Unlock a world of possibilities! Login now and discover the exclusive benefits awaiting you.
Hello All,
I've a code that incrementally creates partitions of a huge sized QVD. The question I have is what if the partitioned QVDs also gets corrupted? Is there any way to identify only the corrupted QVDs? Below is the code that I use.
LET custom_val = 'yes';
// Quantity of IDs in each QVD’s partition, change to build bigger partitions and therefore more or less QVDs
LET _chunk = 5000;
// Unique ID used to patitioning and insert the news
LET _Key='CREATED_ON';
// First part of QVDs file name
LET _QVDname='Logger_Table' & _Key;
// Extraction SQL query
SET _SQLquery=SELECT * FROM Logger_Table";
// WHERE clause which detects the news (inserts + updates)
SET _incrementalWhere= WHERE CREATED_ON >= to_date('$(vLastExecTime)','YYYY-MM-DD hh24:mi:ss');
// Datasource fields read and load to QV
SET _FieldsLoaded= *;
// Optional: filds tranformations
SET _transformFields= ''; // Could be empty – Optional, but if it is must begin with “,”
// QVDs path
SET _QVDsPath ='lib://Everyone/Extract QVDs/';
SET vLastExecTime = $(vLastExecTimeDefault); // resetting vLastExecTime
// As long as a QVD already exists (the isnull() check), find the latest timestamp for modified records. This will be used to generate the delta set.
//QVD data loaded into Temp table to reduce the read time for incremental load parameters
if not isnull(QVDCreateTime('$(vQVDPath)/Extract QVDs/Logger_Table_CREATED_ON=*.QVD')) then
TMP:
Load CREATED_ON
From $(vQVDPath)/Extract QVDs/Logger_Table_CREATED_ON=*.QVD (qvd);
//Here we are loading the date column from qvd in a separate temporary table and then finding max from the temp table.
//This step considerably reduces the load time.
LoadTime:
Load Max(CREATED_ON) as MaxLastModifiedDate
RESIDENT TMP;
Let vLastExecTime = timestamp(peek('MaxLastModifiedDate',0,'LoadTime'),'YYYY-MM-DD hh:mm:ss');
//Drop Tables TMP, LoadTime;
end if
//Substituting the values of the variables _chunk, _Key, _QVDname, _SQLquery, _incrementalWhere, _FieldsLoaded, _QVDsPath and _transformFields can easily adapt and create incremental scripts to load partitioned.
Sub loadIncrementalPartitioned (V_chunck,V_KEY,V_QVDname,V_SQLquery,V_incrementalWhere,V_fieldsLoaded,V_fieldsTransformations)
// Check if the first full load has been done
IF NOT ISNULL(QvdCreateTime('$(_QVDsPath)$(V_QVDname)_Flag.QVD')) THEN
//Extract data from datasource
[pre_CUSTOM_TABLE]:
LOAD $(V_fieldsLoaded);
SQL $(V_SQLquery)
$(V_incrementalWhere);
// the above table will only have limited data or recent data as the incremental where condition is used
//for each row calculate the corresponding patition and make transformations (if any)
//NoConcatenate
[CUSTOM_TABLE]:
LOAD *,
floor($(V_KEY)/$(V_chunck)) as TARGET_QVD
RESIDENT [pre_CUSTOM_TABLE];
DROP TABLE [pre_CUSTOM_TABLE];
//Fill a little table which will be used to build the affected (if already exist) QVDs files names, each one containing upto V_chunk rows
AffectedQVDs:
LOAD DISTINCT TARGET_QVD
RESIDENT CUSTOM_TABLE;
// get the quantity of QVDs that will be used
LET rows = NoOfRows('AffectedQVDs');
FOR row=1 to rows
// assembles QVD file name
let txtQVD = '[' & (FieldValue('TARGET_QVD',row) * V_chunck) & ';' & ((FieldValue('TARGET_QVD',row)+1) * V_chunck - 1) & ']';
// If the QVD file already exists add the news to this, discarding the old versions of same records
IF NOT ISNULL(QvdCreateTime('$(_QVDsPath)$(V_QVDname)=$(txtQVD).QVD')) THEN
CONCATENATE (CUSTOM_TABLE)
CUSTOM_TABLE:
LOAD *
FROM $(_QVDsPath)$(V_QVDname)=$(txtQVD).QVD (QVD)
WHERE NOT EXISTS (PRIMARY_KEY);
ENDIF
LET custom_val = 'yes';
NEXT
ELSE
// Is the first run of this script. It’s necessary to do a full SQL extraction
//pre_CUSTOM_TABLE:
CUSTOM_TABLE:
LOAD $(V_fieldsLoaded);
SQL $(V_SQLquery);
//will store a dummy file just for signal that the first full load has been done
flagFile:
NoConcatenate
LOAD 1 as dummy
AutoGenerate 1;
Let nm = '$(_QVDsPath)$(V_QVDname)_Flag.QVD';
STORE flagFile INTO $(nm);
DROP TABLE flagFile;
LET custom_val = 'yes';
END IF
IF '$(custom_val)' = 'yes' THEN
//Recover the max and min value of the field used to partition
MAXI_MINI:
LOAD min($(V_KEY)) as minPK,
max($(V_KEY)) as maxPK
RESIDENT CUSTOM_TABLE;
LET maxPK=round(Peek('maxPK'));
LET minPK=round(Peek('minPK'));
// calculates the number of partitions
LET partitions=ceil(maxPK/V_chunck);
LET V_I=1;
do while V_I<= partitions
if (V_I*V_chunck>=minPK) then
// extract the chunk for each partition
toStore:
NoConcatenate
LOAD *
RESIDENT CUSTOM_TABLE
WHERE $(V_KEY)>= ($(V_I)-1)*$(V_chunck) and $(V_KEY)<$(V_I)*$(V_chunck);
LET q = NoOfRows('toStore');
//verify if the partition is not empty
IF q>0 then
Let since=(V_I-1)*V_chunck ;
Let upto=(V_I*V_chunck)-1;
// store the partition, using an assembled file name with information about the content
Let nm = '$(_QVDsPath)$(V_QVDname)=[$(since);$(upto)].QVD';
STORE toStore INTO $(nm);
ENDIF
DROP TABLE toStore;
ENDIF
let V_I=V_I+1;
loop
drop table CUSTOM_TABLE;
END IF
end Sub
// call the incremental partitioned load:
call loadIncrementalPartitioned(_chunk,_Key, _QVDname, _SQLquery, _incrementalWhere, _FieldsLoaded, _transformFields);