<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Kafka target: How to set partitioning strategy in Qlik Replicate</title>
    <link>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1751051#M528</link>
    <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.qlik.com/t5/user/viewprofilepage/user-id/127405"&gt;@aleno&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Partitioner is a&amp;nbsp;Topic configuration properties (instead of global one) so you can only use below internal parameter:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;rdkafkaTopicProperties : partitioner=murmur2&lt;/LI-CODE&gt;&lt;P&gt;to implement it.&amp;nbsp;&lt;/P&gt;&lt;P&gt;in Replicate version 6.6 seems it works. At least from task log file it says:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;2020-10-09T20:18:03:145819 [TARGET_APPLY    ]T:  Kafka topic properties: 'partitioner=murmur2'  (kafka_client.c:902)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;however I'm not 100% ensure which partitioner works (&lt;SPAN&gt;random,consistent,murmur2,or fnv1a...&lt;/SPAN&gt;) with above settings. Could you please let me know how can you confirm Replicate is "&lt;SPAN&gt;still partition using the crc32 hash of the message key&lt;/SPAN&gt;" function?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;thanks,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;John.&lt;/P&gt;</description>
    <pubDate>Fri, 09 Oct 2020 12:41:26 GMT</pubDate>
    <dc:creator>john_wang</dc:creator>
    <dc:date>2020-10-09T12:41:26Z</dc:date>
    <item>
      <title>Kafka target: How to set partitioning strategy</title>
      <link>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1750772#M526</link>
      <description>&lt;P&gt;We're using the Qlik Replicate product to produce CDC changes to Kafka and like to change the partitioning strategy for the producer to use the "murmur2" strategy which is the default for the Java Producer and Kafka Streams.&amp;nbsp;&lt;/P&gt;&lt;P&gt;In the documentation for librdkafka&amp;nbsp;&lt;A href="https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md" target="_blank"&gt;https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md&lt;/A&gt;&lt;BR /&gt;There is a producer property 'partitioner=murmur2' which should take care of this, except is does not, it will still partition using the crc32 hash of the message key.&amp;nbsp;&lt;/P&gt;&lt;P&gt;The reason I wan't to change the partitioner to match the one used in Kafka Streams is to be able to consume the CDC events from Qlik Replicate and maintain the Transaction Order of events. What's happening right now is that when the topic is consumed from Kafka Streams, the first thing that happens is that there will be a repartition step where each events from each partition gets rehashed using murmur2 and then reassigned to match that partition and we lose the transaction order.&lt;/P&gt;&lt;P&gt;My replicate settings summary looks like this:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;messageKey : PRIMARY_KEY
partitionMapping : MESSAGE_KEY
rdkafkaProperties : partitioner=murmur2
rdkafkaTopicProperties : partitioner=murmur2&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 08 Oct 2020 15:15:24 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1750772#M526</guid>
      <dc:creator>aleno</dc:creator>
      <dc:date>2020-10-08T15:15:24Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka target: How to set partitioning strategy</title>
      <link>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1751051#M528</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.qlik.com/t5/user/viewprofilepage/user-id/127405"&gt;@aleno&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Partitioner is a&amp;nbsp;Topic configuration properties (instead of global one) so you can only use below internal parameter:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;rdkafkaTopicProperties : partitioner=murmur2&lt;/LI-CODE&gt;&lt;P&gt;to implement it.&amp;nbsp;&lt;/P&gt;&lt;P&gt;in Replicate version 6.6 seems it works. At least from task log file it says:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;2020-10-09T20:18:03:145819 [TARGET_APPLY    ]T:  Kafka topic properties: 'partitioner=murmur2'  (kafka_client.c:902)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;however I'm not 100% ensure which partitioner works (&lt;SPAN&gt;random,consistent,murmur2,or fnv1a...&lt;/SPAN&gt;) with above settings. Could you please let me know how can you confirm Replicate is "&lt;SPAN&gt;still partition using the crc32 hash of the message key&lt;/SPAN&gt;" function?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;thanks,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;John.&lt;/P&gt;</description>
      <pubDate>Fri, 09 Oct 2020 12:41:26 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1751051#M528</guid>
      <dc:creator>john_wang</dc:creator>
      <dc:date>2020-10-09T12:41:26Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka target: How to set partitioning strategy</title>
      <link>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1751526#M539</link>
      <description>&lt;P&gt;Hi again.&lt;/P&gt;&lt;P&gt;I &lt;FONT size="3"&gt;have&lt;/FONT&gt; tested this in a more controlled setting this time and Qlik Replicate is stills publishing using &lt;STRONG&gt;consistent&lt;/STRONG&gt; (CRC32 hash of key) as the partitioner to the target topic, I'll explain how I tested it, so bear with me.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;1. Fixing the topic property&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;The first thing I did was to remove the global parameter and only keep &lt;STRONG&gt;rdkafkaTopicProperties&lt;/STRONG&gt; as you instructed. Then I found the Log Manager to increase the log level to TRACE for both KAFKA_TARGET and KAFKA_LOAD.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;2. Creating a controlled source table and target&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;So for this step I created a new table with one column 'ID' and populated it with the values ID-000 to ID-009.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;I also setup a global transformation like this to add the column &lt;STRONG&gt;$key&lt;/STRONG&gt; and set it to the value from &lt;STRONG&gt;$ID&lt;/STRONG&gt;. This step was needed to get the Kafka message key to &lt;STRONG&gt;ID-000&lt;/STRONG&gt;&amp;nbsp;(i.e. unquoted) instead of default behavior&amp;nbsp;where it got quoted around it&amp;nbsp;&lt;STRONG&gt;"ID-000"&lt;/STRONG&gt;.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;I also created the target topic in Kafka and set it up with &lt;STRONG&gt;10&lt;/STRONG&gt; partitions.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;3. Computing the expected partitions&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;FONT size="3"&gt;Given that I know what my expected keys are going to be. I created a little test case to print me murmur2 and crc32 hashed partition values for a Kafka topic with 10 partitions.&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;import org.apache.kafka.common.utils.Crc32;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

class PartitionerTest {
    private int NUM_PARTITIONS = 10;

    &lt;a href="https://community.qlik.com/t5/user/viewprofilepage/user-id/60323"&gt;@test&lt;/a&gt;
    void test_print_partition() {
        for (int i = 0; i &amp;lt; 10; i++) {
            print_partition(String.format("ID-%03d", i));
        }
    }

    void print_partition(String key) {
        int partition_murmur2 = murmur2(key.getBytes());
        int partition_crc32 = rd_crc32(key.getBytes());
        System.out.println(key + " Murmur2: " + partition_murmur2 + " CRC32: " + partition_crc32);
    }

    int rd_crc32(byte[] keyBytes) {
        Crc32 crc32 = new Crc32();
        crc32.update(keyBytes);
        long c = crc32.getValue();
        return (int) (c % NUM_PARTITIONS);
    }

    int murmur2(byte[] keyBytes) {
        return Utils.toPositive(Utils.murmur2(keyBytes)) % NUM_PARTITIONS;
    }
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Running this test case yielded the following result.&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;KEY		Murmur2(key) 	Crc32(key)
ID-000	4				2
ID-001	0				8
ID-002	5				6
ID-003	4				8
ID-004	9				3
ID-005	6				5
ID-006	9				7
ID-007	0				9
ID-008	6				4
ID-009	1				0&lt;/LI-CODE&gt;&lt;P&gt;&lt;FONT size="5"&gt;4. Starting the task and reloading the target&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;Now when everything is set up the settings summary looks like this.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;brokers : ...
messageKey : PRIMARY_KEY
partitionMapping : MESSAGE_KEY
rdkafkaTopicProperties : partitioner=murmur2
topic : cdc-test-target&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;And I'm ready to start the task &lt;STRONG&gt;Run&lt;/STRONG&gt; -&amp;gt; &lt;STRONG&gt;Reload Target...&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;After the Full Load completed I went and searched the logs and found the line about settings the Kafka topic properties.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;00013012: 2020-10-12T09:35:24 [TARGET_LOAD     ]T:  Kafka topic properties: 'partitioner=murmur2'  (kafka_client.c:902)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I then went on and check the result on the Kafka topic.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;/ # kafkacat -b kafka-broker:9092 -C -t cdc-test-target -f '%p %k : %s\n'
0 ID-009 : {"magic": "atMSG", ...
3 ID-004 : {"magic": "atMSG", ...
6 ID-002 : {"magic": "atMSG", ...
9 ID-007 : {"magic": "atMSG", ...
2 ID-000 : {"magic": "atMSG", ...
5 ID-005 : {"magic": "atMSG", ...
8 ID-001 : {"magic": "atMSG", ...
8 ID-003 : {"magic": "atMSG", ...
4 ID-008 : {"magic": "atMSG", ...
7 ID-006 : {"magic": "atMSG", ...&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;The format string used here prints the partitions number followed by key, then a : and the value (reduced the output here). When comparing the partitions numbers against the key values in the table of precomputed hashes I can see that it matches the CRC32 column and not the murmur2 which is what I would like to have.&lt;/P&gt;&lt;P&gt;&lt;FONT size="5"&gt;5. Let's try everything again using&amp;nbsp;&lt;SPAN&gt;Partition strategy: Random&lt;/SPAN&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Out of curiosity I stopped the task and changed the&amp;nbsp;&lt;STRONG&gt;Partition strategy&lt;/STRONG&gt;: from &lt;STRONG&gt;By message key&lt;/STRONG&gt; to the other alternative &lt;STRONG&gt;Random&lt;/STRONG&gt; and before starting the task and reloading the target again. After checking the output of the topic I&amp;nbsp; could see that the partition where the message went did not match any of the partitions from the table, so I reloaded the table 2 more times to see if it was consistent or random, and in this case I could conclude that random truly is random. That leaves the case that &lt;STRONG&gt;By partition key&lt;/STRONG&gt; somehow implies &lt;STRONG&gt;consistent&lt;/STRONG&gt;&amp;nbsp;(CRC32 hash of key) regardless if I override the &lt;STRONG&gt;partitioner&lt;/STRONG&gt; using &lt;STRONG&gt;rdkafkaTopicPartitions&lt;/STRONG&gt;.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 12 Oct 2020 10:55:05 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1751526#M539</guid>
      <dc:creator>aleno</dc:creator>
      <dc:date>2020-10-12T10:55:05Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka target: How to set partitioning strategy</title>
      <link>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1753124#M561</link>
      <description>&lt;P&gt;Hi Aleno,&lt;/P&gt;&lt;P&gt;please allow me sometime, I need do more investigation and then come back to you.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;John.&lt;/P&gt;</description>
      <pubDate>Fri, 16 Oct 2020 08:33:50 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1753124#M561</guid>
      <dc:creator>john_wang</dc:creator>
      <dc:date>2020-10-16T08:33:50Z</dc:date>
    </item>
    <item>
      <title>Re: Kafka target: How to set partitioning strategy</title>
      <link>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1754315#M566</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.qlik.com/t5/user/viewprofilepage/user-id/127405"&gt;@aleno&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;sorry for the delay. We need additional investigation on this issue,&amp;nbsp; I'd like suggest you create a salesforce support&amp;nbsp; case and support team will continue work on it.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Have a nice day,&lt;/P&gt;&lt;P&gt;John.&lt;/P&gt;</description>
      <pubDate>Wed, 21 Oct 2020 02:38:33 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Qlik-Replicate/Kafka-target-How-to-set-partitioning-strategy/m-p/1754315#M566</guid>
      <dc:creator>john_wang</dc:creator>
      <dc:date>2020-10-21T02:38:33Z</dc:date>
    </item>
  </channel>
</rss>

