Skip to main content
Announcements
Introducing a new Enhanced File Management feature in Qlik Cloud! GET THE DETAILS!
cancel
Showing results for 
Search instead for 
Did you mean: 
SncJt
Contributor III
Contributor III

Kafka - commit offset manually

Hi there,

 

I would like to know if somebody can tell me how it is possible to commit manually the offset of a consummer ?

 

I would like to deactivate the "Auto Commit Enable" option on my cKafka, do what I have to do and only validate the message when everything is done.

 

I saw I have to use commitSync method, but I don't know how.

 

Thanks a lot for your help !

Labels (2)
1 Solution

Accepted Solutions
Anonymous
Not applicable

Your code worked for me.  Be sure to set allowManualCommit=true in Kafka Properties on the Advanced Settings tab of you cKafka component.  Of course, you'll want to uncheck Auto Commit Enabled on Basic Settings of the component.

 

I also found a code snip it in this article that is useful if you use Kafka's ability to grab a batch of message when consuming messages.  This snip-it will ensure it only commits if the route is on the last message of the batch.

https://stackoverflow.com/questions/51843677/how-to-transactionally-poll-kafka-from-camel/52300250#5...

View solution in original post

5 Replies
Anonymous
Not applicable

Hello,

Camel-kafka has only recently added support for manual offset commit.

Please refer to this documentation:

https://camel.apache.org/staging/components/latest/kafka-component.html

Let us know if the property "allowManualCommit" is what you are looking for.

Best regards

Sabrina

SncJt
Contributor III
Contributor III
Author

Hi xdshi,

 

Thanks a lot for the answer !

 

I have read the doc, and tried to put this code in a cProcessor :

 

//Import part

//import java.util.List;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.component.kafka.KafkaConstants;

 

//Code part :

KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();

 

But it does not work.

 

Best regards,

Anonymous
Not applicable

Your code worked for me.  Be sure to set allowManualCommit=true in Kafka Properties on the Advanced Settings tab of you cKafka component.  Of course, you'll want to uncheck Auto Commit Enabled on Basic Settings of the component.

 

I also found a code snip it in this article that is useful if you use Kafka's ability to grab a batch of message when consuming messages.  This snip-it will ensure it only commits if the route is on the last message of the batch.

https://stackoverflow.com/questions/51843677/how-to-transactionally-poll-kafka-from-camel/52300250#5...

SncJt
Contributor III
Contributor III
Author

Hi cbeasley,

 

Thank you for your reply !

 

Actually, it works. I was not using the correct method to check the current offset.

 

If I use Kafka tools for exemple, I can see that the offset is commited manually when I decide it.

 

So, in fact, if you want to cleanly manually commit a kafka message from a Talend route, you can :

- create a beans with a method kafkaManualCommit like :

package beans;

import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.component.kafka.KafkaConstants;


public class RouteCode {    
    
    /**
     * kafkaManualCommit: commit the current offset for a kafka message
     * 
     * 
     * {talendTypes} String
     * 
     * {Category} User Defined
     * 
     * {param} Exchange(exch) input: The current exchange
     * 
     * {example} kafkaManualCommit(exch) # message commited
     */
    public static void kafkaManualCommit(Exchange exch) {
        
    	KafkaManualCommit manual = exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    	manual.commitSync();
    	
    }
    
}

- set a BrokerList, ClientId, Topic and GroupId in each cKafka component

- untick the "Auto commit enable" property in each cKafka consumer

- set the "Auto Commit Enable" property to LATEST in each cKafka consumer

- add the Kafka property "allowManualCommit" Value="true" in the advanced settings of each cKafka consumer

- add a cProcessor at the end of each Route (or wherever you want to commit your offset) with the following code 

/*
* Provide own codes to consume or translate the message exchanges.
* @param org.apache.camel.Exchange exchange
*/
beans.RouteCode.kafkaManualCommit(exchange);

It works for us like this.

 

Thank you for your help !

ANew1661729706
Contributor
Contributor

hey @Beasley Chris​ would you mind showing me a screenshot of how you set this? I've tried but not able to commit on the offset and keep getting from the 1st message