Skip to main content
Announcements
See what Drew Clarke has to say about the Qlik Talend Cloud launch! READ THE BLOG
cancel
Showing results for 
Search instead for 
Did you mean: 
ThomasLetellier
Contributor
Contributor

Custom component: generate records from Kafka input in real time

Hello,

i'm developping a custom component using Talend Tookit to get data from a Kafka server. I'm able to connect to the Kafka server and also to display the data (using System.out.println()) but I can't create records, maybe you can give me some tips when looking at my java code, this is the "Source" file:

@Producer

public

Record

next

() {

while

(

true

){

ConsumerRecords<String

,

Object> records =

consumer

.poll(Duration.ofMillis(

5000

))

;

for

(ConsumerRecord<String

,

Object> record : records) {

 

JSONObject jsonObject =

new

JSONObject(current_line)

;

test

= jsonObject.get(

"type"

).toString()

;

 

System.out.println(

"in= "

+

test

)

;

 

createRecord(

test

)

;

 

}

consumer

.commitAsync()

;

 

}

}

public

Record

createRecord

(String var) {

return

builderFactory

.newRecordBuilder()

.withString(

"name"

,

var)

.build()

;

}

When I print the test value into the console the value is ok, I get all the values from the consumer, but when I try to create a record using the method "

createRecord"

it's not working. When I launch my component in Talend the values are printed but I get no records from the component.

I attached the java code to the post.

 

Regards,

Thomas

Labels (2)
3 Replies
undx
Creator
Creator

Hi @thomas letellier​ ,

Your code does not follow source component's logic/lifecycle. The method with the @Producer annotation should return one record and this one is triggered by the framework execution logic, so you don't need to return everything.

Just poll data and return a record in the Producer.

best regards

ThomasLetellier
Contributor
Contributor
Author

Hello @Emmanuel GALLOIS​ , thank you for your answer !

You're right I modified my Java code to return all the data after each poll (thanks to an ArrayList), and it seem's to work !

 

When I test my component I able to get the results and display them in a tLogRow. The problem is that I would like to write data in a tFileOutputDelimited (or raw whatever) but when I connect my CustomInput component to an outputFile the file remains empty: this is very weird because in debug mode I can see the data flow with the correct values, but nothing arrive in the outputFile (from time to time I able to write a row, but like 10% of the whole dataset).

 

Do you know what is happening ? There is no error / log

 

Here is my new code:

@Producer

public Record next() {

while (true){

ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(10));

System.out.println("new poll");

ArrayList<String> elements = new ArrayList<>();

Integer count = 0;

for (ConsumerRecord<String, Object> record : records) {

count ++;

String current_line = record.value().toString();

JSONObject jsonObject = new JSONObject(current_line);

test = jsonObject.get("type").toString();

elements.add(current_line);

System.out.println("in= " + test);

}

System.out.println("count = " + count.toString());

 

if (elements.size()>0) {

return builderFactory.newRecordBuilder().withString("name", elements.toString()).build();

}

consumer.commitAsync();

}

}

0695b00000F9phoAAB.png 

 

 

 

Regards,

 

Thomas

undx
Creator
Creator

Hi Thomas,

Not sure to have been clear 😉

Do not use a loop in the method annotated with @Producer.

Don't know well kafka (and really not related to kafka) but you should retrieve a record at the time when @producer is called.

ie: smtg like that (code is really for understanding 😁 )

 

@Producer public Record next() { return builderFactory.newRecordBuilder().withString("name", giveMeOneMsgValue()).build(); }

public String giveMeOneMsgValue() {

final int msgLimit = 1;

final ConsumerRecordKind r = consumer.poll(msgLimit); // don't know the API... could use stmg like an iterator or else according the api

consumer.commitAsync();

return r.value();

}

 

Hope this will help u!

br