Skip to main content
Announcements
Introducing Qlik Answers: A plug-and-play, Generative AI powered RAG solution. READ ALL ABOUT IT!
cancel
Showing results for 
Search instead for 
Did you mean: 
STelkar1613587356
Contributor
Contributor

Huge data processing in Talend using tJavaFlex- Kafka streaming

I am using tJavaflex component to build kafka consumer. Following is the code snippet for polling the stream to read the messages

// here is the main part of the component,

// a piece of code executed in the row

// loop

try {

while (true) {

ConsumerRecords<String, String> records = consumer.poll(context.consumer_polltime);

if (records.isEmpty()) {

System.out.println("inside break");

break;

};

for (ConsumerRecord<String, String> record : records) {

i++;

listPayload.add(record.value());

}

}

globalMap.put("ConsumerObj",consumer);

} catch (Exception e) {

System.out.println("Error Consuming Msg: " + e);

// TODO: handle exception

//consumer.close();

}

finally {

consumer.close();

}

row21.payload= String.valueOf(listPayload);

globalMap.put("MsgCount",i);

The output of this component is redirected to tExtractJsonfields as my stream contains json messages

The issue with the above code is that as the number of messages increases, heap space is completely occupied by the ArrayList and I am getting the following error

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

Please suggest on how I can push messages after every poll to the next step for processing, clear the list and continue the loop to read further messages.

Labels (3)
1 Reply
Anonymous
Not applicable

Hi @Sharad Telkar​ would you mind posting the code for your "Start Code", "Main Code" and "End Code" sections please? It is important how these are coded. Also, it should be pointed out that a Talend Job being left to continually receive Kafka messages may not be the best method for consuming theses messages. Talend ESB may be a better option for this. But lets see if we can get going with this method first.