<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Custom component: generate records from Kafka input in real time in Talend Studio</title>
    <link>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252623#M148570</link>
    <description>&lt;P&gt;Hi Thomas, &lt;/P&gt;&lt;P&gt;Not sure to have been clear &lt;span class="lia-unicode-emoji" title=":winking_face:"&gt;😉&lt;/span&gt; &lt;/P&gt;&lt;P&gt;Do not use a loop in the method annotated with @Producer. &lt;/P&gt;&lt;P&gt;Don't know well kafka (and really not related to kafka) but you should retrieve a record at the time when @producer is called.&lt;/P&gt;&lt;P&gt;ie: smtg like that  (code is really for understanding &lt;span class="lia-unicode-emoji" title=":beaming_face_with_smiling_eyes:"&gt;😁&lt;/span&gt; )&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;@Producer public Record next() { return builderFactory.newRecordBuilder().withString("name", &lt;B&gt;giveMeOneMsgValue()&lt;/B&gt;).build(); }&lt;/P&gt;&lt;P&gt;public String giveMeOneMsgValue() {&lt;/P&gt;&lt;P&gt;  final int msgLimit = 1;&lt;/P&gt;&lt;P&gt;  final ConsumerRecordKind r = consumer.poll(msgLimit);  // don't know the API... could use stmg like an iterator or else according the api&lt;/P&gt;&lt;P&gt;  consumer.commitAsync();&lt;/P&gt;&lt;P&gt;  return r.value();&lt;/P&gt;&lt;P&gt;}&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Hope this will help u! &lt;/P&gt;&lt;P&gt;br&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Tue, 01 Jun 2021 17:07:20 GMT</pubDate>
    <dc:creator>undx</dc:creator>
    <dc:date>2021-06-01T17:07:20Z</dc:date>
    <item>
      <title>Custom component: generate records from Kafka input in real time</title>
      <link>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252620#M148567</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;i'm developping a custom component using Talend Tookit to get data from a Kafka server. I'm able to connect to the Kafka server and also to display the data (using System.out.println()) but I can't create records, maybe you can give me some tips when looking at my java code, this is the "Source" file:&lt;/P&gt;&lt;P&gt;@Producer&lt;/P&gt;&lt;P&gt;public &lt;/P&gt;&lt;P&gt;Record &lt;/P&gt;&lt;P&gt;next&lt;/P&gt;&lt;P&gt;() { &lt;/P&gt;&lt;P&gt;while &lt;/P&gt;&lt;P&gt;(&lt;/P&gt;&lt;P&gt;true&lt;/P&gt;&lt;P&gt;){&lt;/P&gt;&lt;P&gt; ConsumerRecords&amp;lt;String&lt;/P&gt;&lt;P&gt;, &lt;/P&gt;&lt;P&gt;Object&amp;gt; records = &lt;/P&gt;&lt;P&gt;consumer&lt;/P&gt;&lt;P&gt;.poll(Duration.&lt;I&gt;ofMillis&lt;/I&gt;(&lt;/P&gt;&lt;P&gt;5000&lt;/P&gt;&lt;P&gt;))&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;for &lt;/P&gt;&lt;P&gt;(ConsumerRecord&amp;lt;String&lt;/P&gt;&lt;P&gt;, &lt;/P&gt;&lt;P&gt;Object&amp;gt; record : records) { &lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;JSONObject jsonObject = &lt;/P&gt;&lt;P&gt;new &lt;/P&gt;&lt;P&gt;JSONObject(current_line)&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;test &lt;/P&gt;&lt;P&gt;= jsonObject.get(&lt;/P&gt;&lt;P&gt;"type"&lt;/P&gt;&lt;P&gt;).toString()&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;System.&lt;I&gt;out&lt;/I&gt;.println(&lt;/P&gt;&lt;P&gt;"in= " &lt;/P&gt;&lt;P&gt;+ &lt;/P&gt;&lt;P&gt;test&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;createRecord(&lt;/P&gt;&lt;P&gt;test&lt;/P&gt;&lt;P&gt;)&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;}&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;consumer&lt;/P&gt;&lt;P&gt;.commitAsync()&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;}&lt;/P&gt;&lt;P&gt;}&lt;/P&gt;&lt;P&gt;public &lt;/P&gt;&lt;P&gt;Record &lt;/P&gt;&lt;P&gt;createRecord&lt;/P&gt;&lt;P&gt;(String var) {&lt;/P&gt;&lt;P&gt; &lt;/P&gt;&lt;P&gt;return &lt;/P&gt;&lt;P&gt;builderFactory&lt;/P&gt;&lt;P&gt;.newRecordBuilder()&lt;/P&gt;&lt;P&gt; .withString(&lt;/P&gt;&lt;P&gt;"name"&lt;/P&gt;&lt;P&gt;, &lt;/P&gt;&lt;P&gt;var)&lt;/P&gt;&lt;P&gt; .build()&lt;/P&gt;&lt;P&gt;;&lt;/P&gt;&lt;P&gt;}&lt;/P&gt;&lt;P&gt;When I print the test value into the console the value is ok, I get all the values from the consumer, but when I try to create a record using the method "&lt;/P&gt;&lt;P&gt;createRecord" &lt;/P&gt;&lt;P&gt;it's not working. When I launch my component in Talend the values are printed but I get no records from the component.&lt;/P&gt;&lt;P&gt;I attached the java code to the post.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Thomas&lt;/P&gt;</description>
      <pubDate>Fri, 02 Jan 2026 14:45:42 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252620#M148567</guid>
      <dc:creator>ThomasLetellier</dc:creator>
      <dc:date>2026-01-02T14:45:42Z</dc:date>
    </item>
    <item>
      <title>Re: Custom component: generate records from Kafka input in real time</title>
      <link>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252621#M148568</link>
      <description>&lt;P&gt;Hi @thomas letellier​&amp;nbsp;, &lt;/P&gt;&lt;P&gt;Your code does not follow source component's logic/lifecycle. The method with the @Producer  annotation should return one record and this one is triggered by the framework execution logic, so you don't need to return everything.&lt;/P&gt;&lt;P&gt;Just poll data and return a record in the Producer.&lt;/P&gt;&lt;P&gt;best regards&lt;/P&gt;</description>
      <pubDate>Tue, 01 Jun 2021 07:26:17 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252621#M148568</guid>
      <dc:creator>undx</dc:creator>
      <dc:date>2021-06-01T07:26:17Z</dc:date>
    </item>
    <item>
      <title>Re: Custom component: generate records from Kafka input in real time</title>
      <link>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252622#M148569</link>
      <description>&lt;P&gt;Hello @Emmanuel GALLOIS​&amp;nbsp;, thank you for your answer !&lt;/P&gt;&lt;P&gt;You're right I modified my Java code to return all the data after each poll (thanks to an ArrayList), and it seem's to work !&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;When I test my component I able to get the results and display them in a tLogRow.  The problem is that I would like to write data in a tFileOutputDelimited (or raw whatever) but when I connect my CustomInput component to an outputFile the file remains empty: this is very weird because in debug mode I can see the data flow with the correct values, but nothing arrive in the outputFile (from time to time I able to write a row, but like 10% of the whole dataset).&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Do you know what is happening ? There is no error / log&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Here is my new code:&lt;/P&gt;&lt;P&gt;@Producer&lt;/P&gt;&lt;P&gt;    public Record next() {&lt;/P&gt;&lt;P&gt;        while (true){&lt;/P&gt;&lt;P&gt;            ConsumerRecords&amp;lt;String, Object&amp;gt; records = consumer.poll(Duration.ofMillis(10));&lt;/P&gt;&lt;P&gt;            System.out.println("new poll");&lt;/P&gt;&lt;P&gt;            ArrayList&amp;lt;String&amp;gt; elements = new ArrayList&amp;lt;&amp;gt;();&lt;/P&gt;&lt;P&gt;            Integer count = 0;&lt;/P&gt;&lt;P&gt;            for (ConsumerRecord&amp;lt;String, Object&amp;gt; record : records) {&lt;/P&gt;&lt;P&gt;                count ++;&lt;/P&gt;&lt;P&gt;                String current_line = record.value().toString();&lt;/P&gt;&lt;P&gt;                JSONObject jsonObject = new JSONObject(current_line);&lt;/P&gt;&lt;P&gt;                 test = jsonObject.get("type").toString();&lt;/P&gt;&lt;P&gt;                 elements.add(current_line);&lt;/P&gt;&lt;P&gt;                 System.out.println("in= " + test);&lt;/P&gt;&lt;P&gt;        }&lt;/P&gt;&lt;P&gt;            System.out.println("count = " + count.toString());&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;             if (elements.size()&amp;gt;0) {&lt;/P&gt;&lt;P&gt;                 return builderFactory.newRecordBuilder().withString("name", elements.toString()).build();&lt;/P&gt;&lt;P&gt;             }&lt;/P&gt;&lt;P&gt;            consumer.commitAsync();&lt;/P&gt;&lt;P&gt;        }&lt;/P&gt;&lt;P&gt;    }&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0695b00000F9phoAAB.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/147877i00ABC2C725153269/image-size/large?v=v2&amp;amp;px=999" role="button" title="0695b00000F9phoAAB.png" alt="0695b00000F9phoAAB.png" /&gt;&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thomas&lt;/P&gt;</description>
      <pubDate>Tue, 01 Jun 2021 08:28:43 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252622#M148569</guid>
      <dc:creator>ThomasLetellier</dc:creator>
      <dc:date>2021-06-01T08:28:43Z</dc:date>
    </item>
    <item>
      <title>Re: Custom component: generate records from Kafka input in real time</title>
      <link>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252623#M148570</link>
      <description>&lt;P&gt;Hi Thomas, &lt;/P&gt;&lt;P&gt;Not sure to have been clear &lt;span class="lia-unicode-emoji" title=":winking_face:"&gt;😉&lt;/span&gt; &lt;/P&gt;&lt;P&gt;Do not use a loop in the method annotated with @Producer. &lt;/P&gt;&lt;P&gt;Don't know well kafka (and really not related to kafka) but you should retrieve a record at the time when @producer is called.&lt;/P&gt;&lt;P&gt;ie: smtg like that  (code is really for understanding &lt;span class="lia-unicode-emoji" title=":beaming_face_with_smiling_eyes:"&gt;😁&lt;/span&gt; )&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;@Producer public Record next() { return builderFactory.newRecordBuilder().withString("name", &lt;B&gt;giveMeOneMsgValue()&lt;/B&gt;).build(); }&lt;/P&gt;&lt;P&gt;public String giveMeOneMsgValue() {&lt;/P&gt;&lt;P&gt;  final int msgLimit = 1;&lt;/P&gt;&lt;P&gt;  final ConsumerRecordKind r = consumer.poll(msgLimit);  // don't know the API... could use stmg like an iterator or else according the api&lt;/P&gt;&lt;P&gt;  consumer.commitAsync();&lt;/P&gt;&lt;P&gt;  return r.value();&lt;/P&gt;&lt;P&gt;}&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Hope this will help u! &lt;/P&gt;&lt;P&gt;br&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 01 Jun 2021 17:07:20 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/Custom-component-generate-records-from-Kafka-input-in-real-time/m-p/2252623#M148570</guid>
      <dc:creator>undx</dc:creator>
      <dc:date>2021-06-01T17:07:20Z</dc:date>
    </item>
  </channel>
</rss>

