Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Qlik Open Lakehouse is Now Generally Available! Discover the key highlights and partner resources here.
cancel
Showing results for 
Search instead for 
Did you mean: 
INESBK
Creator
Creator

[CDC]Insert only data that has changed since last run (Tos_DI)

I would like to insert in my database only the new data. So I used incremental loading by comparing my source (set of files) and my target (sql server table) with inner join but since the number of rows inserted in the database is huge this solution is not feasible.
So I thought of doing the CDC by date comparison (last date of run and my current date)
Unfortunately I don't know how to do it.
Someone can help me please !

 

Labels (2)
27 Replies
INESBK
Creator
Creator
Author

In fact this is what I search, in my base I have a dimestamp so I want to compare max date with the new incoming date in the files...
This comparison is going to be in the query of tmssqlinput or in the tmap filter ?

And should I delete the inner join?

 

vapukov
Master II
Master II

I do this in 2 steps:

 

step 1:

Input from database, something like this (AWS Redshift):

"SELECT dateadd(day, 1, date(Max(\"whendt\"))) as last_date from "+context.AWS_RS_schema+"."+"tbldatatransactions"
or

"SELECT NVL(max(TransactionID), 0)  as maxid  FROM "+context.AWS_RS_schema+".tbltransactions"

than tFlowToIterate for save this value to variable

 

than on next steps - use variable for:

- filter in SQL queries in Input components

"SELECT 
     something
FROM `tblDataTransactions`
WHERE `When` >= " + "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", (Date)globalMap.get("v_last_date")) + "'"  + " AND `When` < "  
+ "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", TalendDate.getCurrentDate()) + "'" 

- tFilter or tMap if source files csv or other format

 

 0683p000009Lv7K.png

INESBK
Creator
Creator
Author

I tried to apply your method. 

Here is a screenshot 0683p000009LuaN.png

So in tMSSqlInput_1 i made this query "SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

than tFlowToIterate for save this value to variable "v_last_date"

After that in tMSSqlInput_2 i made this query :

"SELECT
Trends2.chrono,Trends2.name,Trends2.value,Trends2.quality
FROM Trends2
WHERE S.TS >= " + "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", (Date)globalMap.get("v_last_date")) + "'" + " AND S.TS < "
+ "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", TalendDate.getCurrentDate()) + "'"

S.TS it's a timestamp retrive from tmap_2.

But when i clik Guess schema i have this error :

0683p000009LujC.png

vapukov
Master II
Master II

of course, schema guess - will not work with null value, and on this moment it null

if what Guess schema - change query for this period

 

but (!!!) - you do something wrong completely 

 

First:

"SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

return INT

 

but than You put INT into DateFormat

 

Second:

In my example this variable used for filter Main Input - in Your case Lecture_DBF

in my example - I take MAX from AWS Redshift and use it for filter data from MySQL, You try to use it on same table

if You want use it for filter lookup data, You must change logic - no reason request MAX value from lookup file and than try to use it for filter same lookup table - it always will return NULL result

 

"SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

"SELECT
Trends2.chrono,Trends2.name,Trends2.value,Trends2.quality
FROM Trends2
WHERE S.TS >= " + "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", (Date)globalMap.get("v_last_date")) + "'" + " AND S.TS < "
+ "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", TalendDate.getCurrentDate()) + "'"


Double check - what You request and what want to filter

 

INESBK
Creator
Creator
Author

For fisrt : 

when i write this query "SELECT datediff(day, date(Max(Trends2.TS)), date(getdate())) as last_load FROM Trends2" 

i get ths error : 

0683p000009LuP0.png 

For this reason i change my query to :"SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

Second:

I can't do the filter in Lecture_DBF because this component use to execute script python in order te read file.dbf.

For this reason i uses S.TS from tmap to retrive date came from file.

 

My job allows me to read files.dbf to insert them into a sql server table.
Every time I have new values to insert in the database (since this is data from sensors) so I need to insert only the new lines.
I use the inner join in the beginning but it doesn't work because problem of memory so I have to use the comparison with dates.

The idea is to compare date_max in the table(t_MSSqlInput) with the date_in_file
If date_in_file> date_in_base I insert the data, else I ignore it. 

 

Excuse me if I misunderstood you...

 

vapukov
Master II
Master II

let go step by step

1) to close SQL problem, not depending from form

"SELECT datediff(day, date(Max(Trends2.TS)), date(getdate())) as last_load FROM Trends2" or "SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

 

by this SQL query You are request difference in days - INTEGER value, like 5 days

so this is can not help You in feature filters!!!

 

 

2) even if You request just SELECT date(Max(Trends2.TS)) - it return You MAX date already in database, so - what reason use this value for SELECT from same table - it will empty result always

 

3) for proper resolve the problem need understand what You try to do:

- is it constant flow from sensors? in this case why You worry about date? new data always newer 

- is it batch process? When You parse logs and want write only new? in this case - why not sort this files by date and than just filter all newer than requested MAX(date)?

- if it sensors - it normally mean, not only date is unique parameter, but as well sensor ID as well?

- we know size of You table - what about new data? what size (number of new rows) per iteration?

INESBK
Creator
Creator
Author

Thanks for your replay.

1)The flux is not constant from the sensors. The number of files that represents the sensors is constant (about 3000 file) but each file contains a different number of rows.

I agree with you that the new data always newer but in case of problems or an interruption of the execution (since I will launch the job every night) I need incremental loading to insert only the new lines.

2)Sorry but i didn't understand your question about batch process ! 

3)Id of the sensor is composed of 4 fields: value of the sensor, quality, name, chrono (number of milliseconds since 1970)

4)The number of lines differs from one file to another (it can be 2000 as 30 0000)

And since I will run the job every night to insert new values of sensor, the table size will increase ! 

I hope I was a little clearer.

Thanks !

vapukov
Master II
Master II

Look, this problem not related to Talend, not to databases, just logic and process organisation 

 

Example:

You have 3000 files from different sensors

You can have many file for 1 sensors, or You can have many sensors inside single file. This is variant for logic

 

Variant 1 - many file for same server, but inside single file - only 1 sensor present

in this case You just need:

1) request MAX loading time from database for this sensor

2) process files related to this sensor 1 by 1 starting from oldest and filter all data where date bigger than saved from database

if time not distributed by files - You must first merge all files for this sensor, and do the same after

 

Variant 2 - many sensors in same file

1) select file, again oldest - first

2) check - what sensors inside - tUnique by sensor name

3) request from database MAX date for each sensor inside this file

4) filter data by sensor name and date

in case of variant 2 You can:

- select all sensors from file

- run query like SELECT MAX(date) as last_date, sensor_name FROM table. It will return 1 row per sensor. You can save it into tHashInput

- than tMap with main flow from file and lookup from tHashOutput with filter on sensor name and date bigger than saved

 

as You can see in both variant - You are filter input flow, but not database

in both variant - You must organise processing of files sorted by date, or first merge files and sort them by date, than filter

I use both of this variants in real life, so could confirm - all work as expected, but always must be adjusted for real structure

 

 

 

INESBK
Creator
Creator
Author

For me is variant 1: Each file represents a sensor ! 

If you can explain to me a little more the logic of the first proposition:
Does this filter have to be in tmap and connected by a tmssqlinput to get max (date) from the table?

vapukov
Master II
Master II

keep in mind - I do not have Your data, Your structure and etc, so I more or less try adopt my experience ... but I do this blind

 

if You want have full answer - always public all information

 

Example:

sensor1_20170528_1300-1400.csv

sensor1_20170528_1400-1500.csv

sensor1_20170528_1500-1600.csv

etc

 

so we know:

- sensor1 - it part of file name

- we have 24 file per day

- file name include date time pattern

- sensors could be new, not all already in database (if not - we change logic)

 

We do:

1) tFileList - prepare list of files, extract name of sensors from filename or from first row inside each file, make list unique

Iterate over Sensor names array:

2) tMSSQLRow - SELECT COALESCE(MAX(date), yesterday) as last_date from table WHERE sensor_name = variable_sensor_name, save it to variable

3) tFileList with pattern:

variable_sensor_name+pattern_for_yesterday+.csv

- pattern could be different - even for past Hour or etc.

We are sort files by time - we can guess - earliest filename pattern have easiest create time (if not - we change algorithm)

 

4) tFileInputDelimited -> tFilter (or tMap) where we use last_date as filter condition -> tMSSQLOutput

alternative for 4) tMSSQLInput with SELECT sensor_name, date FROM table WHERE sensor_name= variable_sensor_name AND date > yesterday 

and use it as Lookup as Your original Job for INNER JOIN rejects

 

This is only 1 from many variants:

- You can already have all sensor names in database

- You can have not time patterns in filename

- You can have more than 1 message for same server with same time (for this case good to have (sensor_name, date) as primary or unique key)

- etc

 

For any of combination - we draw process diagram and look, how we can achieve result,