
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Kafka - commit offset manually
Hi there,
I would like to know if somebody can tell me how it is possible to commit manually the offset of a consummer ?
I would like to deactivate the "Auto Commit Enable" option on my cKafka, do what I have to do and only validate the message when everything is done.
I saw I have to use commitSync method, but I don't know how.
Thanks a lot for your help !
Accepted Solutions

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Your code worked for me. Be sure to set allowManualCommit=true in Kafka Properties on the Advanced Settings tab of you cKafka component. Of course, you'll want to uncheck Auto Commit Enabled on Basic Settings of the component.
I also found a code snip it in this article that is useful if you use Kafka's ability to grab a batch of message when consuming messages. This snip-it will ensure it only commits if the route is on the last message of the batch.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello,
Camel-kafka has only recently added support for manual offset commit.
Please refer to this documentation:
https://camel.apache.org/staging/components/latest/kafka-component.html
Let us know if the property "allowManualCommit" is what you are looking for.
Best regards
Sabrina

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi xdshi,
Thanks a lot for the answer !
I have read the doc, and tried to put this code in a cProcessor :
//Import part
//import java.util.List;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.component.kafka.KafkaConstants;
//Code part :
KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
But it does not work.
Best regards,

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Your code worked for me. Be sure to set allowManualCommit=true in Kafka Properties on the Advanced Settings tab of you cKafka component. Of course, you'll want to uncheck Auto Commit Enabled on Basic Settings of the component.
I also found a code snip it in this article that is useful if you use Kafka's ability to grab a batch of message when consuming messages. This snip-it will ensure it only commits if the route is on the last message of the batch.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi cbeasley,
Thank you for your reply !
Actually, it works. I was not using the correct method to check the current offset.
If I use Kafka tools for exemple, I can see that the offset is commited manually when I decide it.
So, in fact, if you want to cleanly manually commit a kafka message from a Talend route, you can :
- create a beans with a method kafkaManualCommit like :
package beans; import org.apache.camel.Exchange; import org.apache.camel.component.kafka.KafkaManualCommit; import org.apache.camel.component.kafka.KafkaConstants; public class RouteCode { /** * kafkaManualCommit: commit the current offset for a kafka message * * * {talendTypes} String * * {Category} User Defined * * {param} Exchange(exch) input: The current exchange * * {example} kafkaManualCommit(exch) # message commited */ public static void kafkaManualCommit(Exchange exch) { KafkaManualCommit manual = exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commitSync(); } }
- set a BrokerList, ClientId, Topic and GroupId in each cKafka component
- untick the "Auto commit enable" property in each cKafka consumer
- set the "Auto Commit Enable" property to LATEST in each cKafka consumer
- add the Kafka property "allowManualCommit" Value="true" in the advanced settings of each cKafka consumer
- add a cProcessor at the end of each Route (or wherever you want to commit your offset) with the following code
/* * Provide own codes to consume or translate the message exchanges. * @param org.apache.camel.Exchange exchange */ beans.RouteCode.kafkaManualCommit(exchange);
It works for us like this.
Thank you for your help !

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
hey @Beasley Chris would you mind showing me a screenshot of how you set this? I've tried but not able to commit on the offset and keep getting from the 1st message
