Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Join us in NYC Sept 4th for Qlik's AI Reality Tour! Register Now
cancel
Showing results for 
Search instead for 
Did you mean: 
Wale
Contributor II
Contributor II

Iterative CDC issues with Qlik Sense

Hello all,

I am working on a project where I am trying to switch a bunch of Qliksense QVDs from full nightly reloads to change data capture reloads (CDC), where only new data (identified by a Primary Key) from our base table (in databricks) will be inserted/updated into a QVD where all the data is stored. 

The current QVD stored via QlikSense feeds off of a base DLT (delta live table) table from databricks. However, we now want to implement CDC and not do a FULL reload every night to save money on clusters costs.

Only new data identified by a BOL_UPD_DT field from said base DLT in databricks will be loaded (added/concatenated) to the existent QVD. The BOL_UPD_DT field  is a timestamp that allows me to identify if an existing row has been newly updated or newly inserted).

My goal is to use the BOL_UPD_DT and Primary Key (PK) from said base DLT table to scope out new data that will make it to the QVD. Both base table and QVD have the same named fields. My issue is - How do I carry out the following steps (via data load editor):

1. Select new data (CDC) from base table based on "LastExecTime" and "BeginningThisExecTime" of the QVD reload. 

2. Load the data from the existing QVD.

3. Join the CDC data from the base table (in step 1) with the loaded data from QVD (in step 2) using their Primary Key (PK) and insert new data into the QVD if the data is new or update existing data in the QVD

How does one script this??????

Qlik-sense does a lot of things intuitively and seemingly doesn't give one refined process control per CDCs; so far for the 3 steps laid out above I the following pseudo script-wise:

===============

Let LastExecTime = ConvertToLocalTime(ReloadTime(), 'Place/TimeZone');
Let BeginningThisExecTime = ConvertToLocalTime(now(), 'Place/TimeZone');
Let LastExecTime = reloadtime();

// Loads data from QVD

LOAD   

PK,
BOL_UPD_DT,
c,
d,
e

 

//Isolate Primary Key for join use using unqualify function

qualify *;
unqualify PK;

// Select newly updated or inserted data from base table

SELECT

PK,
BOL_UPD_DT,
c,
d,
e

FROM base DLT
WHERE BOL_UPD_DT >= #$(LastExecTime)#
AND BOL_UPD_DT < #$(BeginningThisExecTime)#;

=====================

//Next for new QVD inserts I tried:

Concatenate LOAD * FROM QVD 
WHERE NOT EXISTS(PK);   

// did not work. QVD data load editor did not even save/compile because mistake in the script. 

I need some help here....

Labels (1)
12 Replies
henrikalmen
Specialist II
Specialist II

I recommend that you don't use ReloadTime(). It is better to first see what the last (max) record in the qvd is, and use that as base for next data fetch. At least I would do something like that.

You are using qualify * and that will make column names be prefixed with the table name, so when you later try to concatenate tables you will have a mess.

A simpler approach for you is probably to first load data from the databse, after that concatenate the data from qvd. Don't load your qvd data until you have fetched new data. (But you could load what you need from the qvd.

But it's hard to say what your exact problem is. You say that this is pseudoscript, but you also say it doesn't work when you run it. I'd like to see more of the actual script. 

At the end you say that the script did not save because of mistake in the script, but the script is always saved no matter what errors you do. (But never compiled, ever.)

Are you using Qlik Sense? SaaS or on-prem? Or some other qlik product?

Qrishna
Master
Master

Wale
Contributor II
Contributor II
Author

Good afternoon and thank you for responding.

I simplified the use case.  I am simply trying to use incremental loads to insert new data from Databricks tables to Qlik(sense) QVDs.

I have a sample table I created in Databricks called "qlik_cdc". 

It consists of the following fields: 

CAR_ID,
CAR_MODEL,
CAR_YEAR,
COLOR,
CDC_CREATE_DT,
CDC_UPDATE_DT

where CDC_UPDATE_DT is a date field that will be used to track new records.

I already loaded the initial data (just 5 sample records) from the "qlik-cdc" table in databricks into a QVD also called "qlik-cdc" QVD.

I now want to be able to: 1) load new data incrementally without having to  reload the whole QVD. 2) Update existing data in the QVD if existing data in the "qlik-cdc" table in databricks has been updated. Basically "Case 3: Insert and Update (No Delete)" from this page: https://help.qlik.com/en-US/qlikview/May2024/Subsystems/Client/Content/QV_QlikView/QVD_Incremental.h... 

So far here is what I have for incremental INSERTS only:

---------
LET lastReloadTime = ReloadTime();

//Let LastExecTime = ConvertToLocalTime(ReloadTime(), 'Place/TimeZone');
//Let ThisExecTime = ConvertToLocalTime(now(), 'Place/TimeZone');
//Let LastExecTime = reloadtime();
 
If qvdcreatetime("lib://Folder_Path/QLIK_CDC/qlik_cdc.qvd") Then
 
unqualify CAR_ID;  -- to be used in PK join for updates
unqualify CDC_UPDATE_DT; -- to be used to track new data
 
//load existing QVD data
EXISTING_DATA:
LOAD CAR_ID, 
     CAR_MODEL, 
     CAR_YEAR, 
     COLOR, 
     CDC_CREATE_DT,
     CDC_UPDATE_DT;
 
END IF
 
CONCATENATE (EXISTING_DATA)
 
[qlik_cdc]:
SELECT `CAR_ID`,
`CAR_MODEL`,
`CAR_YEAR`,
`COLOR`,
`CDC_CREATE_DT`,
`CDC_UPDATE_DT`
FROM `dw_analytics_inprogress`.`qlik_cdc`
WHERE (CDC_UPDATE_DT > $(lastReloadTime));
 
store qlik_cdc into lib://Folder_ATAP-ANALYTICSInProgress/QLIK_CDC/qlik_cdc.qvd;
-------

I am about to start testing this, so i cannot tell you this second if it works, but I'll start testing in the next hr or so. I will certainly need help with the UPDATES if you have any experience with that Sir.

Thank you so much for responding....
Wale
Contributor II
Contributor II
Author

I've combed through that page for 2 months, Seems easy (Case 3: Insert and Update (No Delete)) to implement but there is a lot of pertinent info missing on that page.

henrikalmen
Specialist II
Specialist II

What you do is basically: fetch new data (based on the timestamp you have), combine data with previously fetched data in existing qvd, store the qvd under the same name (it will overwrite the old qvd).

You can do it the other way around as well: load existing data from qvd, fetch new data and concatenate it into the table with old data, store the qvd with all data.

Wale
Contributor II
Contributor II
Author

After testing, I agree with this statement: "I recommend that you don't use ReloadTime(). It is better to first see what the last (max) record in the qvd is, and use that as base for next data fetch. At least I would do something like that."

How do I grab the Max(CDC_UPDATE_DT) and set it to a variable i can use in the insert query?

LET lastReloadTime = Date(Max(CDC_UPDATE_DT); -- is not working

full code as of now:

====

LET lastReloadTime = Date(Max(CDC_UPDATE_DT);

QV_Table:
SELECT `CAR_ID`,
`CAR_MODEL`,
`CAR_YEAR`,
`COLOR`,
`CDC_CREATE_DT`,
`CDC_UPDATE_DT`
FROM `schema`.`qlik_cdc`
WHERE (CDC_UPDATE_DT > $(lastReloadTime));
 
Concatenate LOAD CAR_ID, 
     CAR_MODEL, 
     CAR_YEAR, 
     COLOR, 
     CDC_CREATE_DT,
     CDC_UPDATE_DT
     WHERE NOT Exists(CAR_ID); 
 
STORE QV_Table INTO "lib://Folder_path/QLIK_CDC/qlik_cdc.qvd";
=====

my issue now is getting the max(CDC_UPDATE_DT) from the QVD

 

Wale
Contributor II
Contributor II
Author

This is where I am now:
=====

TMP:
Load date(max(CDC_UPDATE_DT)) as MaxDate
From lib://Folder_path/QLIK_CDC/qlik_cdc.qvd (QVD);
LET MaxDate = Peek('MaxDate',0,'TMP');
DROP Table TMP;
 
//
QV_Table:
SELECT `CAR_ID`,
`CAR_MODEL`,
`CAR_YEAR`,
`COLOR`,
`CDC_CREATE_DT`,
`CDC_UPDATE_DT`
FROM `schema_name`.`qlik_cdc`
WHERE (CDC_UPDATE_DT > $(MaxDate));
 
Concatenate LOAD CAR_ID, 
     CAR_MODEL, 
     CAR_YEAR, 
     COLOR, 
     CDC_CREATE_DT,
     CDC_UPDATE_DT
     WHERE NOT Exists(CAR_ID); 
 
STORE QV_Table INTO "lib://Folder_ATAP-ANALYTICSInProgress/QLIK_CDC/qlik_cdc.qvd";
======
 
I am getting the following error:
 
The following error occurred:
ERROR [42K09] [Qlik][Hardy] (80) Syntax or semantic analysis error thrown in server while executing query. Error message from server: org.apache.hive.service.cli.HiveSQLException: Error running query: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] org.apache.spark.sql.catalyst.ExtendedAnalysisException: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(CDC_UPDATE_DT > ((12 / 24) / 2024))" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("DATE" and "DOUBLE"). SQLSTATE: 42K09; line 8 pos 7 at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:49) at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$1(SparkExecuteStatementOperation.scala:805) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at org...
 
The error occurred here:
QV_Table: SELECT
`CAR_ID`,
`CAR_MODEL`,
`CAR_YEAR`,
`COLOR`,
`CDC_CREATE_DT`,
`CDC_UPDATE_DT`
FROM `dw_analytics_inprogress`.`qlik_cdc`
WHERE (CDC_UPDATE_DT > 12/24/2024)
Wale
Contributor II
Contributor II
Author

This finally worked without erroring out, just need to make sure it did what I wanted:

====================

LastUpdateDate:
Load
MAX(CDC_UPDATE_DT) as max_last_update
FROM lib://Folder_path/QLIK_CDC/qlik_cdc.qvd (QVD);
LET vMaxLastUpdate = Date(Peek('max_last_update', 0, 'TempTable'), 'YYYY-MM-DD');
DROP Table LastUpdateDate;
 
Incremental:
SELECT `CAR_ID`,
`CAR_MODEL`,
`CAR_YEAR`,
`COLOR`,
`CDC_CREATE_DT`,
`CDC_UPDATE_DT`
FROM `dw_schema`.`qlik_cdc`
WHERE CDC_UPDATE_DT > '$(vMaxLastUpdate)';
 
Concatenate 
LOAD CAR_ID, 
     CAR_MODEL, 
     CAR_YEAR, 
     COLOR, 
     CDC_CREATE_DT,
     CDC_UPDATE_DT
     FROM lib://Folder_path/QLIK_CDC/qlik_cdc.qvd (QVD)
     WHERE NOT Exists(CAR_ID); 
     
Inner Join SQL SELECT CAR_ID FROM `dw_analytics_inprogress`.`qlik_cdc`;
 
STORE Incremental INTO "lib://Folder_path/QLIK_CDC/qlik_cdc.qvd";
 
Drop Table Incremental;
Wale
Contributor II
Contributor II
Author

Okay - i'm moving pretty quickly here. The incremental inserts work now,

Wale_0-1736881295802.png

 

I just need to figure out updates, how do I do updates for already existing records in the QVD if its source (in Databricks) has newly updated records (not new inserts, but new updates to existing).