Unlock a world of possibilities! Login now and discover the exclusive benefits awaiting you.
Hello,
i'm developping a custom component using Talend Tookit to get data from a Kafka server. I'm able to connect to the Kafka server and also to display the data (using System.out.println()) but I can't create records, maybe you can give me some tips when looking at my java code, this is the "Source" file:
@Producer
public
Record
next
() {
while
(
true
){
ConsumerRecords<String
,
Object> records =
consumer
.poll(Duration.ofMillis(
5000
))
;
for
(ConsumerRecord<String
,
Object> record : records) {
JSONObject jsonObject =
new
JSONObject(current_line)
;
test
= jsonObject.get(
"type"
).toString()
;
System.out.println(
"in= "
+
test
)
;
createRecord(
test
)
;
}
consumer
.commitAsync()
;
}
}
public
Record
createRecord
(String var) {
return
builderFactory
.newRecordBuilder()
.withString(
"name"
,
var)
.build()
;
}
When I print the test value into the console the value is ok, I get all the values from the consumer, but when I try to create a record using the method "
createRecord"
it's not working. When I launch my component in Talend the values are printed but I get no records from the component.
I attached the java code to the post.
Regards,
Thomas
Hi @thomas letellier ,
Your code does not follow source component's logic/lifecycle. The method with the @Producer annotation should return one record and this one is triggered by the framework execution logic, so you don't need to return everything.
Just poll data and return a record in the Producer.
best regards
Hello @Emmanuel GALLOIS , thank you for your answer !
You're right I modified my Java code to return all the data after each poll (thanks to an ArrayList), and it seem's to work !
When I test my component I able to get the results and display them in a tLogRow. The problem is that I would like to write data in a tFileOutputDelimited (or raw whatever) but when I connect my CustomInput component to an outputFile the file remains empty: this is very weird because in debug mode I can see the data flow with the correct values, but nothing arrive in the outputFile (from time to time I able to write a row, but like 10% of the whole dataset).
Do you know what is happening ? There is no error / log
Here is my new code:
@Producer
public Record next() {
while (true){
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofMillis(10));
System.out.println("new poll");
ArrayList<String> elements = new ArrayList<>();
Integer count = 0;
for (ConsumerRecord<String, Object> record : records) {
count ++;
String current_line = record.value().toString();
JSONObject jsonObject = new JSONObject(current_line);
test = jsonObject.get("type").toString();
elements.add(current_line);
System.out.println("in= " + test);
}
System.out.println("count = " + count.toString());
if (elements.size()>0) {
return builderFactory.newRecordBuilder().withString("name", elements.toString()).build();
}
consumer.commitAsync();
}
}
Regards,
Thomas
Hi Thomas,
Not sure to have been clear 😉
Do not use a loop in the method annotated with @Producer.
Don't know well kafka (and really not related to kafka) but you should retrieve a record at the time when @producer is called.
ie: smtg like that (code is really for understanding 😁 )
@Producer public Record next() { return builderFactory.newRecordBuilder().withString("name", giveMeOneMsgValue()).build(); }
public String giveMeOneMsgValue() {
final int msgLimit = 1;
final ConsumerRecordKind r = consumer.poll(msgLimit); // don't know the API... could use stmg like an iterator or else according the api
consumer.commitAsync();
return r.value();
}
Hope this will help u!
br