<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Huge data processing in Talend using tJavaFlex- Kafka streaming in Talend Studio</title>
    <link>https://community.qlik.com/t5/Talend-Studio/Huge-data-processing-in-Talend-using-tJavaFlex-Kafka-streaming/m-p/2327347#M96732</link>
    <description>&lt;P&gt;I am using tJavaflex component to build kafka consumer. Following is the code snippet for polling the stream to read the messages&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;// here is the main part of the component,&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;// a piece of code executed in the row&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;// loop&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;try&lt;/B&gt;&lt;B&gt; {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    &lt;/B&gt;&lt;B&gt;while&lt;/B&gt;&lt;B&gt; (&lt;/B&gt;&lt;B&gt;true&lt;/B&gt;&lt;B&gt;) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    ConsumerRecords&amp;lt;String, String&amp;gt; records = consumer.poll(context.consumer_polltime);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;     &lt;/B&gt;&lt;B&gt;if&lt;/B&gt;&lt;B&gt; (records.isEmpty()) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;     System.out.println(&lt;/B&gt;&lt;B&gt;"inside break"&lt;/B&gt;&lt;B&gt;);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;B&gt;break&lt;/B&gt;&lt;B&gt;;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;        };&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;        &lt;/B&gt;&lt;B&gt;for&lt;/B&gt;&lt;B&gt; (ConsumerRecord&amp;lt;String, String&amp;gt; record : records) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            i++;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            listPayload.add(record.value());        &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;        }&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    }&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;globalMap.put(&lt;/B&gt;&lt;B&gt;"ConsumerObj"&lt;/B&gt;&lt;B&gt;,consumer);  &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;}   &lt;/B&gt;&lt;B&gt;catch&lt;/B&gt;&lt;B&gt; (Exception e) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            System.out.println(&lt;/B&gt;&lt;B&gt;"Error Consuming Msg: "&lt;/B&gt;&lt;B&gt; + e);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;B&gt;// &lt;/B&gt;&lt;B&gt;TODO:&lt;/B&gt;&lt;B&gt; handle exception&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;B&gt;//consumer.close();&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    }&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    &lt;/B&gt;&lt;B&gt;finally&lt;/B&gt;&lt;B&gt; {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;  consumer.close();&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;}&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;row21.payload= String.valueOf(listPayload);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;globalMap.put(&lt;/B&gt;&lt;B&gt;"MsgCount"&lt;/B&gt;&lt;B&gt;,i);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The output of this component is redirected to tExtractJsonfields as my stream contains json messages&lt;/P&gt;&lt;P&gt;The issue with the above code is that as the number of messages increases, heap space is completely occupied by the&amp;nbsp;&lt;B&gt;ArrayList&lt;/B&gt;&amp;nbsp;and I am getting the following error&lt;/P&gt;&lt;P&gt;&lt;B&gt;Exception in thread "main" java.lang.OutOfMemoryError: Java heap space&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Please suggest on how I can push messages after every poll to the next step for processing, clear the list and continue the loop to read further messages.&lt;/P&gt;</description>
    <pubDate>Sat, 16 Nov 2024 00:33:42 GMT</pubDate>
    <dc:creator>STelkar1613587356</dc:creator>
    <dc:date>2024-11-16T00:33:42Z</dc:date>
    <item>
      <title>Huge data processing in Talend using tJavaFlex- Kafka streaming</title>
      <link>https://community.qlik.com/t5/Talend-Studio/Huge-data-processing-in-Talend-using-tJavaFlex-Kafka-streaming/m-p/2327347#M96732</link>
      <description>&lt;P&gt;I am using tJavaflex component to build kafka consumer. Following is the code snippet for polling the stream to read the messages&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;// here is the main part of the component,&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;// a piece of code executed in the row&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;// loop&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;try&lt;/B&gt;&lt;B&gt; {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    &lt;/B&gt;&lt;B&gt;while&lt;/B&gt;&lt;B&gt; (&lt;/B&gt;&lt;B&gt;true&lt;/B&gt;&lt;B&gt;) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    ConsumerRecords&amp;lt;String, String&amp;gt; records = consumer.poll(context.consumer_polltime);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;     &lt;/B&gt;&lt;B&gt;if&lt;/B&gt;&lt;B&gt; (records.isEmpty()) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;     System.out.println(&lt;/B&gt;&lt;B&gt;"inside break"&lt;/B&gt;&lt;B&gt;);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;B&gt;break&lt;/B&gt;&lt;B&gt;;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;        };&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;        &lt;/B&gt;&lt;B&gt;for&lt;/B&gt;&lt;B&gt; (ConsumerRecord&amp;lt;String, String&amp;gt; record : records) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            i++;&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            listPayload.add(record.value());        &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;        }&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    }&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;globalMap.put(&lt;/B&gt;&lt;B&gt;"ConsumerObj"&lt;/B&gt;&lt;B&gt;,consumer);  &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;}   &lt;/B&gt;&lt;B&gt;catch&lt;/B&gt;&lt;B&gt; (Exception e) {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            System.out.println(&lt;/B&gt;&lt;B&gt;"Error Consuming Msg: "&lt;/B&gt;&lt;B&gt; + e);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;B&gt;// &lt;/B&gt;&lt;B&gt;TODO:&lt;/B&gt;&lt;B&gt; handle exception&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;            &lt;/B&gt;&lt;B&gt;//consumer.close();&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    }&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;    &lt;/B&gt;&lt;B&gt;finally&lt;/B&gt;&lt;B&gt; {&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;  consumer.close();&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;}&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;row21.payload= String.valueOf(listPayload);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;globalMap.put(&lt;/B&gt;&lt;B&gt;"MsgCount"&lt;/B&gt;&lt;B&gt;,i);&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The output of this component is redirected to tExtractJsonfields as my stream contains json messages&lt;/P&gt;&lt;P&gt;The issue with the above code is that as the number of messages increases, heap space is completely occupied by the&amp;nbsp;&lt;B&gt;ArrayList&lt;/B&gt;&amp;nbsp;and I am getting the following error&lt;/P&gt;&lt;P&gt;&lt;B&gt;Exception in thread "main" java.lang.OutOfMemoryError: Java heap space&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Please suggest on how I can push messages after every poll to the next step for processing, clear the list and continue the loop to read further messages.&lt;/P&gt;</description>
      <pubDate>Sat, 16 Nov 2024 00:33:42 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/Huge-data-processing-in-Talend-using-tJavaFlex-Kafka-streaming/m-p/2327347#M96732</guid>
      <dc:creator>STelkar1613587356</dc:creator>
      <dc:date>2024-11-16T00:33:42Z</dc:date>
    </item>
    <item>
      <title>Re: Huge data processing in Talend using tJavaFlex- Kafka streaming</title>
      <link>https://community.qlik.com/t5/Talend-Studio/Huge-data-processing-in-Talend-using-tJavaFlex-Kafka-streaming/m-p/2327348#M96733</link>
      <description>&lt;P&gt;Hi @Sharad Telkar​&amp;nbsp;would you mind posting the code for your "Start Code", "Main Code" and "End Code" sections please? It is important how these are coded. Also, it should be pointed out that a Talend Job being left to continually receive Kafka messages may not be the best method for consuming theses messages. Talend ESB may be a better option for this. But lets see if we can get going with this method first.&lt;/P&gt;</description>
      <pubDate>Mon, 22 Feb 2021 10:33:00 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/Huge-data-processing-in-Talend-using-tJavaFlex-Kafka-streaming/m-p/2327348#M96733</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2021-02-22T10:33:00Z</dc:date>
    </item>
  </channel>
</rss>

