
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Kafka target: How to set partitioning strategy
We're using the Qlik Replicate product to produce CDC changes to Kafka and like to change the partitioning strategy for the producer to use the "murmur2" strategy which is the default for the Java Producer and Kafka Streams.
In the documentation for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
There is a producer property 'partitioner=murmur2' which should take care of this, except is does not, it will still partition using the crc32 hash of the message key.
The reason I wan't to change the partitioner to match the one used in Kafka Streams is to be able to consume the CDC events from Qlik Replicate and maintain the Transaction Order of events. What's happening right now is that when the topic is consumed from Kafka Streams, the first thing that happens is that there will be a repartition step where each events from each partition gets rehashed using murmur2 and then reassigned to match that partition and we lose the transaction order.
My replicate settings summary looks like this:
messageKey : PRIMARY_KEY
partitionMapping : MESSAGE_KEY
rdkafkaProperties : partitioner=murmur2
rdkafkaTopicProperties : partitioner=murmur2

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello @aleno ,
Partitioner is a Topic configuration properties (instead of global one) so you can only use below internal parameter:
rdkafkaTopicProperties : partitioner=murmur2
to implement it.
in Replicate version 6.6 seems it works. At least from task log file it says:
2020-10-09T20:18:03:145819 [TARGET_APPLY ]T: Kafka topic properties: 'partitioner=murmur2' (kafka_client.c:902)
however I'm not 100% ensure which partitioner works (random,consistent,murmur2,or fnv1a...) with above settings. Could you please let me know how can you confirm Replicate is "still partition using the crc32 hash of the message key" function?
thanks,
John.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi again.
I have tested this in a more controlled setting this time and Qlik Replicate is stills publishing using consistent (CRC32 hash of key) as the partitioner to the target topic, I'll explain how I tested it, so bear with me.
1. Fixing the topic property
The first thing I did was to remove the global parameter and only keep rdkafkaTopicProperties as you instructed. Then I found the Log Manager to increase the log level to TRACE for both KAFKA_TARGET and KAFKA_LOAD.
2. Creating a controlled source table and target
So for this step I created a new table with one column 'ID' and populated it with the values ID-000 to ID-009.
I also setup a global transformation like this to add the column $key and set it to the value from $ID. This step was needed to get the Kafka message key to ID-000 (i.e. unquoted) instead of default behavior where it got quoted around it "ID-000".
I also created the target topic in Kafka and set it up with 10 partitions.
3. Computing the expected partitions
Given that I know what my expected keys are going to be. I created a little test case to print me murmur2 and crc32 hashed partition values for a Kafka topic with 10 partitions.
import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
class PartitionerTest {
private int NUM_PARTITIONS = 10;
@test
void test_print_partition() {
for (int i = 0; i < 10; i++) {
print_partition(String.format("ID-%03d", i));
}
}
void print_partition(String key) {
int partition_murmur2 = murmur2(key.getBytes());
int partition_crc32 = rd_crc32(key.getBytes());
System.out.println(key + " Murmur2: " + partition_murmur2 + " CRC32: " + partition_crc32);
}
int rd_crc32(byte[] keyBytes) {
Crc32 crc32 = new Crc32();
crc32.update(keyBytes);
long c = crc32.getValue();
return (int) (c % NUM_PARTITIONS);
}
int murmur2(byte[] keyBytes) {
return Utils.toPositive(Utils.murmur2(keyBytes)) % NUM_PARTITIONS;
}
}
Running this test case yielded the following result.
KEY Murmur2(key) Crc32(key)
ID-000 4 2
ID-001 0 8
ID-002 5 6
ID-003 4 8
ID-004 9 3
ID-005 6 5
ID-006 9 7
ID-007 0 9
ID-008 6 4
ID-009 1 0
4. Starting the task and reloading the target
Now when everything is set up the settings summary looks like this.
brokers : ...
messageKey : PRIMARY_KEY
partitionMapping : MESSAGE_KEY
rdkafkaTopicProperties : partitioner=murmur2
topic : cdc-test-target
And I'm ready to start the task Run -> Reload Target...
After the Full Load completed I went and searched the logs and found the line about settings the Kafka topic properties.
00013012: 2020-10-12T09:35:24 [TARGET_LOAD ]T: Kafka topic properties: 'partitioner=murmur2' (kafka_client.c:902)
I then went on and check the result on the Kafka topic.
/ # kafkacat -b kafka-broker:9092 -C -t cdc-test-target -f '%p %k : %s\n'
0 ID-009 : {"magic": "atMSG", ...
3 ID-004 : {"magic": "atMSG", ...
6 ID-002 : {"magic": "atMSG", ...
9 ID-007 : {"magic": "atMSG", ...
2 ID-000 : {"magic": "atMSG", ...
5 ID-005 : {"magic": "atMSG", ...
8 ID-001 : {"magic": "atMSG", ...
8 ID-003 : {"magic": "atMSG", ...
4 ID-008 : {"magic": "atMSG", ...
7 ID-006 : {"magic": "atMSG", ...
The format string used here prints the partitions number followed by key, then a : and the value (reduced the output here). When comparing the partitions numbers against the key values in the table of precomputed hashes I can see that it matches the CRC32 column and not the murmur2 which is what I would like to have.
5. Let's try everything again using Partition strategy: Random
Out of curiosity I stopped the task and changed the Partition strategy: from By message key to the other alternative Random and before starting the task and reloading the target again. After checking the output of the topic I could see that the partition where the message went did not match any of the partitions from the table, so I reloaded the table 2 more times to see if it was consistent or random, and in this case I could conclude that random truly is random. That leaves the case that By partition key somehow implies consistent (CRC32 hash of key) regardless if I override the partitioner using rdkafkaTopicPartitions.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Aleno,
please allow me sometime, I need do more investigation and then come back to you.
Regards,
John.

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello @aleno ,
sorry for the delay. We need additional investigation on this issue, I'd like suggest you create a salesforce support case and support team will continue work on it.
Have a nice day,
John.
