Skip to main content
Announcements
Introducing Qlik Answers: A plug-and-play, Generative AI powered RAG solution. READ ALL ABOUT IT!
cancel
Showing results for 
Search instead for 
Did you mean: 
ThomasLetellier
Contributor
Contributor

Custom component: generate records from Kafka input in real time

Hello,

i'm developping a custom component using Talend Tookit to get data from a Kafka server. I'm able to connect to the Kafka server and also to display the data (using System.out.println()) but I can't create records, maybe you can give me some tips when looking at my java code, this is the "Source" file:

@Producer

public

Record

next

() {

while

(

true

){

ConsumerRecords<String

,

Object> records =

consumer

.poll(Duration.ofMillis(

5000

))

;

for

(ConsumerRecord<String

,

Object> record : records) {

 

JSONObject jsonObject =

new

JSONObject(current_line)

;

test

= jsonObject.get(

"type"

).toString()

;

 

System.out.println(

"in= "

+

test

)

;

 

createRecord(

test

)

;

 

}

consumer

.commitAsync()

;

 

}

}

public

Record

createRecord

(String var) {

return

builderFactory

.newRecordBuilder()

.withString(

"name"

,

var)

.build()

;

}

When I print the test value into the console the value is ok, I get all the values from the consumer, but when I try to create a record using the method "

createRecord"

it's not working. When I launch my component in Talend the values are printed but I get no records from the component.

I attached the java code to the post.

 

Regards,

Thomas

Labels (2)
3 Replies
undx
Creator
Creator

Hi @thomas letellier​ ,

Your code does not follow source component's logic/lifecycle. The method with the @Producer annotation should return one record and this one is triggered by the framework execution logic, so you don't need to return everything.

Just poll data and return a record in the Producer.

best regards

ThomasLetellier
Contributor
Contributor
Author

Hello @Emmanuel GALLOIS​ , thank you for your answer !

You're right I modified my Java code to return all the data after each poll (thanks to an ArrayList), and it seem's to work !

 

When I test my component I able to get the results and display them in a tLogRow. The problem is that I would like to write data in a tFileOutputDelimited (or raw whatever) but when I connect my CustomInput component to an outputFile the file remains empty: this is very weird because in debug mode I can see the data flow with the correct values, but nothing arrive in the outputFile (from time to time I able to write a row, but like 10% of the whole dataset).

 

Do you know what is happening ? There is no error / log

 

Here is my new code:

@Producer

public Record next() {

while (true){

ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(10));

System.out.println("new poll");

ArrayList<String> elements = new ArrayList<>();

Integer count = 0;

for (ConsumerRecord<String, Object> record : records) {

count ++;

String current_line = record.value().toString();

JSONObject jsonObject = new JSONObject(current_line);

test = jsonObject.get("type").toString();

elements.add(current_line);

System.out.println("in= " + test);

}

System.out.println("count = " + count.toString());

 

if (elements.size()>0) {

return builderFactory.newRecordBuilder().withString("name", elements.toString()).build();

}

consumer.commitAsync();

}

}

0695b00000F9phoAAB.png 

 

 

 

Regards,

 

Thomas

undx
Creator
Creator

Hi Thomas,

Not sure to have been clear 😉

Do not use a loop in the method annotated with @Producer.

Don't know well kafka (and really not related to kafka) but you should retrieve a record at the time when @producer is called.

ie: smtg like that (code is really for understanding 😁 )

 

@Producer public Record next() { return builderFactory.newRecordBuilder().withString("name", giveMeOneMsgValue()).build(); }

public String giveMeOneMsgValue() {

final int msgLimit = 1;

final ConsumerRecordKind r = consumer.poll(msgLimit); // don't know the API... could use stmg like an iterator or else according the api

consumer.commitAsync();

return r.value();

}

 

Hope this will help u!

br