Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Join us in NYC Sept 4th for Qlik's AI Reality Tour! Register Now
cancel
Showing results for 
Search instead for 
Did you mean: 
TalendDisaster
Contributor II
Contributor II

Kafka-commit-offset-manually - java.lang.NullPointerException: Cannot invoke because "manual" is null

Hello,

I'm facing to a problem since few weeks and I have looking for a solution until the limb of the net without nothing.

We are talking about kafkaManualCommit with Apache Camel 3.20.6 or 3.20.9 in a Talend route.

It's working from Talend Studio but not as a bundle deployment in a runtime.

 

Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null

 

This artcile helps me but I'm stuck on a runtime: https://community.qlik.com/t5/Design-and-Development/Kafka-commit-offset-manually/td-p/2325795

 

Versions:

  • Talend Studio version : R2024-08
  • Java 17
  • Talend Runtime version (last one): Patch_20241018_R2024-10_v1-RT-8.0.1.R2024-05-RT

 

From Studio:

I have created a custom bean jar with this code:

package org.example.myruntimekafka.beansjar.mycustombean;

import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;

public class myMisterbean {

public static void kafkaManualCommit(Exchange exch) {
KafkaManualCommit manual = exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); 
manual.commit();
}
}

 

In a cProcessor :

org.example.myruntimekafka.beansjar.mycustombean.myMisterbean.kafkaManualCommit(exchange);

 

CommitAuto is disabled for Ckafka and allowManualCommit is true.

 

So now it's working from Talend Studio, I read a message with the cKafka consumer and commit offset with cProcessor.

 

From Runtime:

After deploy from the TAC or download as a kar in the deploy directory, I have this error:

 

2024-10-29T17:15:37,189 | WARN | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #7 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 172 | 96 - org.apache.camel.camel-api - 3.20.9 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: toto]
2024-10-29T17:15:37,191 | ERROR | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #7 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 205 | 111 - org.apache.camel.camel-core-reifier - 3.20.9 | Failed delivery for (MessageId: A4A6B98485A1374-0000000000000000 on ExchangeId: A4A6B98485A1374-0000000000000000). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null

Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
java:149 myStreamKafkaPoc_cKafka_1/mySt from[kafka://XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX? 2419473435
...
java:153 myStreamKafkaPoc_cKafka_1/mySt Processor@0x42abfead 0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null
at org.example.myruntimekafka.beansjar.mycustombean.myMisterbean.kafkaManualCommit(myMisterbean.java:42) ~[mycustombean-8.0.1.jar:?]
at myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc$1.process(myStreamKafkaPoc.java:165) ~[bundleFile:?]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[?:?]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[?:?]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[?:?]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[?:?]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:110) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:125) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:78) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:328) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:160) ~[camel-kafka-3.20.6.jar:3.20.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:842) ~[?:?]
2024-10-29T17:15:37,196 | WARN | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #7 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 214 | 134 - org.apache.camel.camel-support - 3.20.9.20240425 | Error during processing. Exchange[A4A6B98485A1374-0000000000000000]. Caused by: [java.lang.NullPointerException - Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null]
java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null
at org.example.myruntimekafka.beansjar.mycustombean.myMisterbean.kafkaManualCommit(myMisterbean.java:42) ~[mycustombean-8.0.1.jar:?]
at myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc$1.process(myStreamKafkaPoc.java:165) ~[bundleFile:?]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[?:?]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[?:?]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[?:?]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[?:?]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:110) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:125) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:78) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:328) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:160) ~[camel-kafka-3.20.6.jar:3.20.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:842) ~[?:?]

 

I have print some informations to see something

System.out.println("Exchange" + exch);

System.out.println("KafkaConstants" + KafkaConstants.MANUAL_COMMIT);

Boolean lastOne = exch.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);

System.out.println("bool" + lastOne);

 

but it works:

ExchangeExchange[203894ECF30EE33-0000000000000000]
KafkaConstantsCamelKafkaManualCommit
booltrue

 

Also something like this

exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class).commit();

And I have

2024-10-29T11:37:54,972 | ERROR | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #5 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 205 | 111 - org.apache.camel.camel-core-reifier - 3.20.9 | Failed delivery for (MessageId: 2F88C0A67D3F5A5-0000000000000001 on ExchangeId: 2F88C0A67D3F5A5-0000000000000001). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because the return value of "org.apache.camel.Message.getHeader(String, java.lang.Class)" is null

 

If you have any idea !

 

Thanks for your help.

 

--

Gregg

Labels (2)
0 Replies