Unlock a world of possibilities! Login now and discover the exclusive benefits awaiting you.
Hello,
I'm facing to a problem since few weeks and I have looking for a solution until the limb of the net without nothing.
We are talking about kafkaManualCommit with Apache Camel 3.20.6 or 3.20.9 in a Talend route.
It's working from Talend Studio but not as a bundle deployment in a runtime.
Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null
This artcile helps me but I'm stuck on a runtime: https://community.qlik.com/t5/Design-and-Development/Kafka-commit-offset-manually/td-p/2325795
Versions:
From Studio:
I have created a custom bean jar with this code:
package org.example.myruntimekafka.beansjar.mycustombean;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
public class myMisterbean {
public static void kafkaManualCommit(Exchange exch) {
KafkaManualCommit manual = exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commit();
}
}
In a cProcessor :
org.example.myruntimekafka.beansjar.mycustombean.myMisterbean.kafkaManualCommit(exchange);
CommitAuto is disabled for Ckafka and allowManualCommit is true.
So now it's working from Talend Studio, I read a message with the cKafka consumer and commit offset with cProcessor.
From Runtime:
After deploy from the TAC or download as a kar in the deploy directory, I have this error:
2024-10-29T17:15:37,189 | WARN | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #7 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 172 | 96 - org.apache.camel.camel-api - 3.20.9 | Exchange[ExchangePattern: InOnly, BodyType: String, Body: toto]
2024-10-29T17:15:37,191 | ERROR | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #7 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 205 | 111 - org.apache.camel.camel-core-reifier - 3.20.9 | Failed delivery for (MessageId: A4A6B98485A1374-0000000000000000 on ExchangeId: A4A6B98485A1374-0000000000000000). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null
Message History (complete message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
java:149 myStreamKafkaPoc_cKafka_1/mySt from[kafka://XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX? 2419473435
...
java:153 myStreamKafkaPoc_cKafka_1/mySt Processor@0x42abfead 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null
at org.example.myruntimekafka.beansjar.mycustombean.myMisterbean.kafkaManualCommit(myMisterbean.java:42) ~[mycustombean-8.0.1.jar:?]
at myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc$1.process(myStreamKafkaPoc.java:165) ~[bundleFile:?]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[?:?]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[?:?]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[?:?]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[?:?]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:110) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:125) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:78) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:328) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:160) ~[camel-kafka-3.20.6.jar:3.20.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:842) ~[?:?]
2024-10-29T17:15:37,196 | WARN | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #7 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 214 | 134 - org.apache.camel.camel-support - 3.20.9.20240425 | Error during processing. Exchange[A4A6B98485A1374-0000000000000000]. Caused by: [java.lang.NullPointerException - Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null]
java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because "manual" is null
at org.example.myruntimekafka.beansjar.mycustombean.myMisterbean.kafkaManualCommit(myMisterbean.java:42) ~[mycustombean-8.0.1.jar:?]
at myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc$1.process(myStreamKafkaPoc.java:165) ~[bundleFile:?]
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:65) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[?:?]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[?:?]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) ~[?:?]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[?:?]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[?:?]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[bundleFile:3.20.9.20240425]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor.processExchange(KafkaRecordProcessor.java:110) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processRecord(KafkaRecordProcessorFacade.java:125) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade.processPolledRecords(KafkaRecordProcessorFacade.java:78) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.startPolling(KafkaFetchRecords.java:328) ~[camel-kafka-3.20.6.jar:3.20.6]
at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:160) ~[camel-kafka-3.20.6.jar:3.20.6]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at java.lang.Thread.run(Thread.java:842) ~[?:?]
I have print some informations to see something
System.out.println("Exchange" + exch);
System.out.println("KafkaConstants" + KafkaConstants.MANUAL_COMMIT);
Boolean lastOne = exch.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
System.out.println("bool" + lastOne);
but it works:
ExchangeExchange[203894ECF30EE33-0000000000000000]
KafkaConstantsCamelKafkaManualCommit
booltrue
Also something like this
exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class).commit();
And I have
2024-10-29T11:37:54,972 | ERROR | Camel (myruntimekafka.mystreamkafkapoc_0_1.myStreamKafkaPoc) thread #5 - KafkaConsumer[XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX] | org.apache.camel.spi.CamelLogger 205 | 111 - org.apache.camel.camel-core-reifier - 3.20.9 | Failed delivery for (MessageId: 2F88C0A67D3F5A5-0000000000000001 on ExchangeId: 2F88C0A67D3F5A5-0000000000000001). Exhausted after delivery attempt: 1 caught: java.lang.NullPointerException: Cannot invoke "org.apache.camel.component.kafka.consumer.KafkaManualCommit.commit()" because the return value of "org.apache.camel.Message.getHeader(String, java.lang.Class)" is null
If you have any idea !
Thanks for your help.
--
Gregg