Skip to main content
cancel
Showing results for 
Search instead for 
Did you mean: 
Not applicable

Qlik rest connector for elasticsearch using custom pagination and scrollid

After reading through - Re: Pulling data from Elastic Search

I have tried to use Qlik rest connector for elasticsearch using custom pagination and scrollid. For some reason it is not pulling all the data. My script seems to load only the initial set of elastic documents.

Here is my load script -

I first get the total count of documents by using count?-

CUSTOM CONNECT TO "Provider=QvRestConnector.exe;url=http://appedwdevbal898.ds.susq.com:9200/application_log/_count?;timeout=30;method=GET;autoDetectResp...";

RestConnectorMasterTable:

SQL SELECT

  "count",

  "__KEY_root",

  (SELECT

  "total",

  "successful",

  "failed",

  "__FK__shards"

  FROM "_shards" FK "__FK__shards")

FROM JSON (wrap on) "root" PK "__KEY_root";

[countroot]:

LOAD [count] as totalCount,

  [__KEY_root] as countKeyRoot

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__KEY_root]);

DROP TABLE RestConnectorMasterTable;

-----------------------------------------------------------------------------------------------------------------

I then use this totalCount in the main loop as follows -

CUSTOM CONNECT TO "Provider=QvRestConnector.exe;url=http://appedwdevbal898.ds.susq.com:9200/application_log/_search?scroll%25m;timeout=30;method=GET;aut...";

let totalCount = peek('totalCount');

let current_scroll_id = '';

let totalfetched = 0;

Let pageSize = 100000;

do while totalfetched < totalCount

RestConnectorMasterTable:

SQL SELECT

  "_scroll_id",

  "took",

  "timed_out",

  "__KEY_root",

  (SELECT

  "total",

  "successful",

  "failed",

  "__FK__shards"

  FROM "_shards" FK "__FK__shards"),

  (SELECT

  "total" AS "total_u0",

  "max_score",

  "__KEY_hits",

  "__FK_hits",

  (SELECT

  "_index",

  "_type",

  "_id",

  "_score",

  "_ttl",

  "__KEY_hits_u0",

  "__FK_hits_u0",

  (SELECT

  "message",

  "@version",

  "@timestamp",

  "source",

  "type",

  "host",

  "thread",

  "severity",

  "class",

  "__KEY__source",

  "__FK__source",

  (SELECT

  "hostname",

  "name",

  "__FK_beat"

  FROM "beat" FK "__FK_beat"),

  (SELECT

  "@Value",

  "__FK_tags"

  FROM "tags" FK "__FK_tags" ArrayValueAlias "@Value")

  FROM "_source" PK "__KEY__source" FK "__FK__source")

  FROM "hits" PK "__KEY_hits_u0" FK "__FK_hits_u0")

  FROM "hits" PK "__KEY_hits" FK "__FK_hits")

FROM JSON (wrap on) "root" PK "__KEY_root"

WITH CONNECTION(Url "http://appedwdevbal898.ds.susq.com:9200/application_log/_search?scroll=5m&scroll_id=$(current_scroll...)");

current_scroll_id = Peek('_scroll_id');

totalfetched = totalfetched + pageSize;

loop;

[_shards]:

LOAD [total],

  [successful],

  [failed],

  [__FK__shards] AS [__KEY_root]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK__shards]);

[beat]:

LOAD [hostname],

  [name],

  [__FK_beat] AS [__KEY__source]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK_beat]);

[tags]:

LOAD [@Value],

  [__FK_tags] AS [__KEY__source]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK_tags]);

[_source]:

LOAD [message],

  [@version],

  [@timestamp],

  [source],

  [type],

  [host],

  [thread],

  [severity],

  [class],

  [__KEY__source],

  [__FK__source] AS [__KEY_hits_u0]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK__source]);

[hits]:

LOAD [_index],

  [_type],

  [_id],

  [_score],

  [_ttl],

  [__KEY_hits_u0],

  [__FK_hits_u0] AS [__KEY_hits]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK_hits_u0]);

[hits_u0]:

LOAD [total_u0],

  [max_score],

  [__KEY_hits],

  [__FK_hits] AS [__KEY_root]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK_hits]);

[root]:

LOAD [_scroll_id],

  [took],

  [timed_out],

  [__KEY_root]

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__KEY_root]);

DROP TABLE RestConnectorMasterTable;

--------------------------------------------------

Please help!

1 Solution

Accepted Solutions
Not applicable
Author

Sorry, i should have posted the resolution earlier.

I was able to resolve it by making a small change to the loop as follows -

do while totalfetched < totalCount

RestConnectorMasterTable:

SQL SELECT

  "_scroll_id",

  "__KEY_root",

  (SELECT

  "__FK_hits",

  "__KEY_hits",

  (SELECT

  "__KEY_hits_u0",

  "__FK_hits_u0",

  (SELECT

  "message",

  "@version",

  "@timestamp",

  "source",

  "type",

  "host",

  "thread",

  "severity",

  "class",

  "__FK__source",

  "__KEY__source"

  FROM "_source" PK "__KEY__source" FK "__FK__source")

  FROM "hits" PK "__KEY_hits_u0" FK "__FK_hits_u0")

  FROM "hits" PK "__KEY_hits" FK "__FK_hits")

FROM JSON (wrap on) "root" PK "__KEY_root"

WITH CONNECTION(Url "http://appedwdevbal898.ds.susq.com:9200/_search/scroll?scroll=1m&scroll_id=$(current_scroll_id)&size...)");

let current_scroll_id = Peek('_scroll_id');

hitsinThisIteration:

LOAD max([__KEY_hits_u0]) as 'hitscount'

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK_hits_u0]);

let totalfetched = peek('hitscount');

drop Table hitsinThisIteration;

loop;

View solution in original post

7 Replies
Anonymous
Not applicable
Author

Hi !

It seems that I have exactly the same error.

Did you resolved your issue?

Thanks for your return.

Not applicable
Author

Sorry, i should have posted the resolution earlier.

I was able to resolve it by making a small change to the loop as follows -

do while totalfetched < totalCount

RestConnectorMasterTable:

SQL SELECT

  "_scroll_id",

  "__KEY_root",

  (SELECT

  "__FK_hits",

  "__KEY_hits",

  (SELECT

  "__KEY_hits_u0",

  "__FK_hits_u0",

  (SELECT

  "message",

  "@version",

  "@timestamp",

  "source",

  "type",

  "host",

  "thread",

  "severity",

  "class",

  "__FK__source",

  "__KEY__source"

  FROM "_source" PK "__KEY__source" FK "__FK__source")

  FROM "hits" PK "__KEY_hits_u0" FK "__FK_hits_u0")

  FROM "hits" PK "__KEY_hits" FK "__FK_hits")

FROM JSON (wrap on) "root" PK "__KEY_root"

WITH CONNECTION(Url "http://appedwdevbal898.ds.susq.com:9200/_search/scroll?scroll=1m&scroll_id=$(current_scroll_id)&size...)");

let current_scroll_id = Peek('_scroll_id');

hitsinThisIteration:

LOAD max([__KEY_hits_u0]) as 'hitscount'

RESIDENT RestConnectorMasterTable

WHERE NOT IsNull([__FK_hits_u0]);

let totalfetched = peek('hitscount');

drop Table hitsinThisIteration;

loop;

Anonymous
Not applicable
Author

Hello,

Thank you for your answer.

I will try to adapt it with my code.

Thank you again and i'll keep you inform

Have a nice day.

Anonymous
Not applicable
Author

Just a question because it doesn't work for me, what version of Qlikview you use?

Thanks for your answer !

prabhu_pandian8
Partner - Contributor II
Partner - Contributor II

Hi,

I am trying to achieve the similar thing. My question is - for the first time it enters the loop the scroll id is empty right? so how does it work? Do we need to fetch the scroll id before itself?

Regards,

Prabhu

Anonymous
Not applicable
Author

Hi Prabhu,

good question. Did you achieve it?

qiestro-engeeni
Contributor
Contributor

Hi to everyone, you can fetch first sroll_id by creating temp table before RestConnectorMasterTable:

scrollTable:

SQL SELECT

  "_scroll_id" AS "scrollid",

  (SELECT

  "total" AS "total_u1"

  FROM "hits")

FROM JSON (wrap on) "root";

Let total = Peek('total_u1',0,'scrollTable');
Let scrollid = Peek('scrollid',1,'scrollTable');

And then you can use 'scrollid' variable in the 'with connection' query!