Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Join us in Bucharest on Sept 18th for Qlik's AI Reality Tour! Register Now
cancel
Showing results for 
Search instead for 
Did you mean: 
SncJt
Contributor III
Contributor III

Kafka - commit offset manually

Hi there,

 

I would like to know if somebody can tell me how it is possible to commit manually the offset of a consummer ?

 

I would like to deactivate the "Auto Commit Enable" option on my cKafka, do what I have to do and only validate the message when everything is done.

 

I saw I have to use commitSync method, but I don't know how.

 

Thanks a lot for your help !

Labels (2)
1 Solution

Accepted Solutions
Anonymous
Not applicable

Your code worked for me.  Be sure to set allowManualCommit=true in Kafka Properties on the Advanced Settings tab of you cKafka component.  Of course, you'll want to uncheck Auto Commit Enabled on Basic Settings of the component.

 

I also found a code snip it in this article that is useful if you use Kafka's ability to grab a batch of message when consuming messages.  This snip-it will ensure it only commits if the route is on the last message of the batch.

https://stackoverflow.com/questions/51843677/how-to-transactionally-poll-kafka-from-camel/52300250#5...

View solution in original post

5 Replies
Anonymous
Not applicable

Hello,

Camel-kafka has only recently added support for manual offset commit.

Please refer to this documentation:

https://camel.apache.org/staging/components/latest/kafka-component.html

Let us know if the property "allowManualCommit" is what you are looking for.

Best regards

Sabrina

SncJt
Contributor III
Contributor III
Author

Hi xdshi,

 

Thanks a lot for the answer !

 

I have read the doc, and tried to put this code in a cProcessor :

 

//Import part

//import java.util.List;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.component.kafka.KafkaConstants;

 

//Code part :

KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();

 

But it does not work.

 

Best regards,

Anonymous
Not applicable

Your code worked for me.  Be sure to set allowManualCommit=true in Kafka Properties on the Advanced Settings tab of you cKafka component.  Of course, you'll want to uncheck Auto Commit Enabled on Basic Settings of the component.

 

I also found a code snip it in this article that is useful if you use Kafka's ability to grab a batch of message when consuming messages.  This snip-it will ensure it only commits if the route is on the last message of the batch.

https://stackoverflow.com/questions/51843677/how-to-transactionally-poll-kafka-from-camel/52300250#5...

SncJt
Contributor III
Contributor III
Author

Hi cbeasley,

 

Thank you for your reply !

 

Actually, it works. I was not using the correct method to check the current offset.

 

If I use Kafka tools for exemple, I can see that the offset is commited manually when I decide it.

 

So, in fact, if you want to cleanly manually commit a kafka message from a Talend route, you can :

- create a beans with a method kafkaManualCommit like :

package beans;

import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.component.kafka.KafkaConstants;


public class RouteCode {    
    
    /**
     * kafkaManualCommit: commit the current offset for a kafka message
     * 
     * 
     * {talendTypes} String
     * 
     * {Category} User Defined
     * 
     * {param} Exchange(exch) input: The current exchange
     * 
     * {example} kafkaManualCommit(exch) # message commited
     */
    public static void kafkaManualCommit(Exchange exch) {
        
    	KafkaManualCommit manual = exch.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    	manual.commitSync();
    	
    }
    
}

- set a BrokerList, ClientId, Topic and GroupId in each cKafka component

- untick the "Auto commit enable" property in each cKafka consumer

- set the "Auto Commit Enable" property to LATEST in each cKafka consumer

- add the Kafka property "allowManualCommit" Value="true" in the advanced settings of each cKafka consumer

- add a cProcessor at the end of each Route (or wherever you want to commit your offset) with the following code 

/*
* Provide own codes to consume or translate the message exchanges.
* @param org.apache.camel.Exchange exchange
*/
beans.RouteCode.kafkaManualCommit(exchange);

It works for us like this.

 

Thank you for your help !

ANew1661729706
Contributor
Contributor

hey @Beasley Chris​ would you mind showing me a screenshot of how you set this? I've tried but not able to commit on the offset and keep getting from the 1st message