Unlock a world of possibilities! Login now and discover the exclusive benefits awaiting you.
I am using tJavaflex component to build kafka consumer. Following is the code snippet for polling the stream to read the messages
// here is the main part of the component,
// a piece of code executed in the row
// loop
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(context.consumer_polltime);
if (records.isEmpty()) {
System.out.println("inside break");
break;
};
for (ConsumerRecord<String, String> record : records) {
i++;
listPayload.add(record.value());
}
}
globalMap.put("ConsumerObj",consumer);
} catch (Exception e) {
System.out.println("Error Consuming Msg: " + e);
// TODO: handle exception
//consumer.close();
}
finally {
consumer.close();
}
row21.payload= String.valueOf(listPayload);
globalMap.put("MsgCount",i);
The output of this component is redirected to tExtractJsonfields as my stream contains json messages
The issue with the above code is that as the number of messages increases, heap space is completely occupied by the ArrayList and I am getting the following error
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
Please suggest on how I can push messages after every poll to the next step for processing, clear the list and continue the loop to read further messages.
Hi @Sharad Telkar would you mind posting the code for your "Start Code", "Main Code" and "End Code" sections please? It is important how these are coded. Also, it should be pointed out that a Talend Job being left to continually receive Kafka messages may not be the best method for consuming theses messages. Talend ESB may be a better option for this. But lets see if we can get going with this method first.