Skip to main content
Announcements
Introducing Qlik Answers: A plug-and-play, Generative AI powered RAG solution. READ ALL ABOUT IT!
cancel
Showing results for 
Search instead for 
Did you mean: 
aleno
Contributor
Contributor

Kafka target: How to set partitioning strategy

We're using the Qlik Replicate product to produce CDC changes to Kafka and like to change the partitioning strategy for the producer to use the "murmur2" strategy which is the default for the Java Producer and Kafka Streams. 

In the documentation for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
There is a producer property 'partitioner=murmur2' which should take care of this, except is does not, it will still partition using the crc32 hash of the message key. 

The reason I wan't to change the partitioner to match the one used in Kafka Streams is to be able to consume the CDC events from Qlik Replicate and maintain the Transaction Order of events. What's happening right now is that when the topic is consumed from Kafka Streams, the first thing that happens is that there will be a repartition step where each events from each partition gets rehashed using murmur2 and then reassigned to match that partition and we lose the transaction order.

My replicate settings summary looks like this:

messageKey : PRIMARY_KEY
partitionMapping : MESSAGE_KEY
rdkafkaProperties : partitioner=murmur2
rdkafkaTopicProperties : partitioner=murmur2

 

Labels (2)
4 Replies
john_wang
Support
Support

Hello @aleno ,

Partitioner is a Topic configuration properties (instead of global one) so you can only use below internal parameter:

rdkafkaTopicProperties : partitioner=murmur2

to implement it. 

in Replicate version 6.6 seems it works. At least from task log file it says:

2020-10-09T20:18:03:145819 [TARGET_APPLY    ]T:  Kafka topic properties: 'partitioner=murmur2'  (kafka_client.c:902)

 

however I'm not 100% ensure which partitioner works (random,consistent,murmur2,or fnv1a...) with above settings. Could you please let me know how can you confirm Replicate is "still partition using the crc32 hash of the message key" function?

 

thanks,

 

John.

Help users find answers! Do not forget to mark a solution that worked for you! If already marked, give it a thumbs up!
aleno
Contributor
Contributor
Author

Hi again.

I have tested this in a more controlled setting this time and Qlik Replicate is stills publishing using consistent (CRC32 hash of key) as the partitioner to the target topic, I'll explain how I tested it, so bear with me.

1. Fixing the topic property

The first thing I did was to remove the global parameter and only keep rdkafkaTopicProperties as you instructed. Then I found the Log Manager to increase the log level to TRACE for both KAFKA_TARGET and KAFKA_LOAD.

2. Creating a controlled source table and target

So for this step I created a new table with one column 'ID' and populated it with the values ID-000 to ID-009.

I also setup a global transformation like this to add the column $key and set it to the value from $ID. This step was needed to get the Kafka message key to ID-000 (i.e. unquoted) instead of default behavior where it got quoted around it "ID-000".

I also created the target topic in Kafka and set it up with 10 partitions.

3. Computing the expected partitions

Given that I know what my expected keys are going to be. I created a little test case to print me murmur2 and crc32 hashed partition values for a Kafka topic with 10 partitions.

 

import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

class PartitionerTest {
    private int NUM_PARTITIONS = 10;

    @test
    void test_print_partition() {
        for (int i = 0; i < 10; i++) {
            print_partition(String.format("ID-%03d", i));
        }
    }

    void print_partition(String key) {
        int partition_murmur2 = murmur2(key.getBytes());
        int partition_crc32 = rd_crc32(key.getBytes());
        System.out.println(key + " Murmur2: " + partition_murmur2 + " CRC32: " + partition_crc32);
    }

    int rd_crc32(byte[] keyBytes) {
        Crc32 crc32 = new Crc32();
        crc32.update(keyBytes);
        long c = crc32.getValue();
        return (int) (c % NUM_PARTITIONS);
    }

    int murmur2(byte[] keyBytes) {
        return Utils.toPositive(Utils.murmur2(keyBytes)) % NUM_PARTITIONS;
    }
}

 

Running this test case yielded the following result.

KEY		Murmur2(key) 	Crc32(key)
ID-000	4				2
ID-001	0				8
ID-002	5				6
ID-003	4				8
ID-004	9				3
ID-005	6				5
ID-006	9				7
ID-007	0				9
ID-008	6				4
ID-009	1				0

4. Starting the task and reloading the target

Now when everything is set up the settings summary looks like this.

 

brokers : ...
messageKey : PRIMARY_KEY
partitionMapping : MESSAGE_KEY
rdkafkaTopicProperties : partitioner=murmur2
topic : cdc-test-target

 

And I'm ready to start the task Run -> Reload Target...

After the Full Load completed I went and searched the logs and found the line about settings the Kafka topic properties.

 

00013012: 2020-10-12T09:35:24 [TARGET_LOAD     ]T:  Kafka topic properties: 'partitioner=murmur2'  (kafka_client.c:902)

 

I then went on and check the result on the Kafka topic.

 

/ # kafkacat -b kafka-broker:9092 -C -t cdc-test-target -f '%p %k : %s\n'
0 ID-009 : {"magic": "atMSG", ...
3 ID-004 : {"magic": "atMSG", ...
6 ID-002 : {"magic": "atMSG", ...
9 ID-007 : {"magic": "atMSG", ...
2 ID-000 : {"magic": "atMSG", ...
5 ID-005 : {"magic": "atMSG", ...
8 ID-001 : {"magic": "atMSG", ...
8 ID-003 : {"magic": "atMSG", ...
4 ID-008 : {"magic": "atMSG", ...
7 ID-006 : {"magic": "atMSG", ...

 

The format string used here prints the partitions number followed by key, then a : and the value (reduced the output here). When comparing the partitions numbers against the key values in the table of precomputed hashes I can see that it matches the CRC32 column and not the murmur2 which is what I would like to have.

5. Let's try everything again using Partition strategy: Random

Out of curiosity I stopped the task and changed the Partition strategy: from By message key to the other alternative Random and before starting the task and reloading the target again. After checking the output of the topic I  could see that the partition where the message went did not match any of the partitions from the table, so I reloaded the table 2 more times to see if it was consistent or random, and in this case I could conclude that random truly is random. That leaves the case that By partition key somehow implies consistent (CRC32 hash of key) regardless if I override the partitioner using rdkafkaTopicPartitions

john_wang
Support
Support

Hi Aleno,

please allow me sometime, I need do more investigation and then come back to you.

 

Regards,

John.

Help users find answers! Do not forget to mark a solution that worked for you! If already marked, give it a thumbs up!
john_wang
Support
Support

Hello @aleno ,

 

sorry for the delay. We need additional investigation on this issue,  I'd like suggest you create a salesforce support  case and support team will continue work on it.

 

Have a nice day,

John.

Help users find answers! Do not forget to mark a solution that worked for you! If already marked, give it a thumbs up!