<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic tKafkaInputAvro fails when deserializing Avro Message in Talend Studio</title>
    <link>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369917#M133019</link>
    <description>&lt;P&gt;We are trying to deserialize an Avro message from a Kafka Topic in a Big Data Streaming job.&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;For this attempt, we are trying to use the tKafkaInputAvro component but continue to get errors the job tries to deserialize the message.&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;How do you configured the tKafkaInputAvro component so it will successfully deserialize a Avro message from a Kafka Topic?&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;Avro Schema:&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;PRE&gt;{
  "namespace": "example",
&amp;nbsp; "type": "record",
&amp;nbsp; "name": "KafkaInputAvro",
&amp;nbsp; "fields": [
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; {"name": "Name", "type": "string"},
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; {"name": "Color",&amp;nbsp; "type": ["string", "null"]},
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; {"name": "Number", "type": ["int", "null"]}
&amp;nbsp;]
}&lt;/PRE&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;Java Publisher:&lt;/P&gt; 
&lt;P&gt;Messages are published to Kafka via a Java code.&amp;nbsp; The code has been updated to send the message with and without a key:&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/P&gt; 
&lt;PRE&gt;Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

KafkaProducer&amp;lt;String, byte[]&amp;gt; messageProducer = new KafkaProducer&amp;lt;String, byte[]&amp;gt;(props);
ProducerRecord&amp;lt;String, byte[]&amp;gt; producerRecord = null;
producerRecord = new ProducerRecord&amp;lt;String, byte[]&amp;gt;(TOPIC, data.toByteArray());
//producerRecord = new ProducerRecord&amp;lt;String, byte[]&amp;gt;(TOPIC, “1”, data.toByteArray());
messageProducer.send(producerRecord);&lt;/PRE&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;Talend Job - Consumer:&lt;/P&gt; 
&lt;P&gt;Here is what the consumer job looks like, the tKafkaInputAvro has been updated with “Use hierarchical mode” enabled and disabled, regardless the setting the deserialization fails.&lt;SPAN class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="KafkaInputAvro1.png" style="width: 964px;"&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0683p000009LyKC.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/151906i7A55189832C9CEA6/image-size/large?v=v2&amp;amp;px=999" role="button" title="0683p000009LyKC.png" alt="0683p000009LyKC.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;tKafkaInputAvro Schema:&lt;SPAN class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="KafkaInputAvro3.png" style="width: 551px;"&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0683p000009Lv1m.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/151652i126C90DF2BEE94CD/image-size/large?v=v2&amp;amp;px=999" role="button" title="0683p000009Lv1m.png" alt="0683p000009Lv1m.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;When set to Use hierarchical mode:&lt;BR /&gt;&lt;SPAN class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="KafkaInputAvro2.png" style="width: 999px;"&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0683p000009LyW8.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/155845iD56CE0B940FFE369/image-size/large?v=v2&amp;amp;px=999" role="button" title="0683p000009LyW8.png" alt="0683p000009LyW8.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;When the job runs and tries to deserialize the message we get the following exceptions (stack trace removed), see attached file for complete stack trace:&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;PRE&gt;Starting job ReadMessage_tKafkaInputAvro at 07:58 06/06/2018.
[statistics] connecting to socket on port 4016
[statistics] connected
[WARN ]: org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[WARN ]: org.apache.spark.SparkConf - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding enable.auto.commit to false for executor
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding auto.offset.reset to none for executor
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding executor group.id to spark-executor-example.KafkaInputAvroGroup1
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding receive.buffer.bytes to 65536 see KAFKA-3135
[WARN ]: org.apache.kafka.clients.consumer.ConsumerConfig - The configuration serializer.encoding = UTF-8 was supplied but isn't a known config.
[WARN ]: org.apache.kafka.clients.consumer.ConsumerConfig - The configuration serializer.encoding = UTF-8 was supplied but isn't a known config.
[ERROR]: org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition example.KafkaInputAvro-0 at offset 1
…
Caused by: java.lang.ArrayIndexOutOfBoundsException: -40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
…
at talend_proj.readmessage_tkafkainputavro_0_1.ReadMessage_tKafkaInputAvro$tKafkaInputAvro_1_KafkaAbstractInputAvro_ValueDeserializer.deserialize(ReadMessage_tKafkaInputAvro.java:423)
at talend_proj.readmessage_tkafkainputavro_0_1.ReadMessage_tKafkaInputAvro$tKafkaInputAvro_1_KafkaAbstractInputAvro_ValueDeserializer.deserialize(ReadMessage_tKafkaInputAvro.java:1)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:655)
... 41 more
[ERROR]: org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition example.KafkaInputAvro-0 at offset 1
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:665)
…&lt;/PRE&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Sat, 16 Nov 2024 08:08:40 GMT</pubDate>
    <dc:creator>Anonymous</dc:creator>
    <dc:date>2024-11-16T08:08:40Z</dc:date>
    <item>
      <title>tKafkaInputAvro fails when deserializing Avro Message</title>
      <link>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369917#M133019</link>
      <description>&lt;P&gt;We are trying to deserialize an Avro message from a Kafka Topic in a Big Data Streaming job.&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;For this attempt, we are trying to use the tKafkaInputAvro component but continue to get errors the job tries to deserialize the message.&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;How do you configured the tKafkaInputAvro component so it will successfully deserialize a Avro message from a Kafka Topic?&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;Avro Schema:&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;PRE&gt;{
  "namespace": "example",
&amp;nbsp; "type": "record",
&amp;nbsp; "name": "KafkaInputAvro",
&amp;nbsp; "fields": [
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; {"name": "Name", "type": "string"},
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; {"name": "Color",&amp;nbsp; "type": ["string", "null"]},
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; {"name": "Number", "type": ["int", "null"]}
&amp;nbsp;]
}&lt;/PRE&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;Java Publisher:&lt;/P&gt; 
&lt;P&gt;Messages are published to Kafka via a Java code.&amp;nbsp; The code has been updated to send the message with and without a key:&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/P&gt; 
&lt;PRE&gt;Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

KafkaProducer&amp;lt;String, byte[]&amp;gt; messageProducer = new KafkaProducer&amp;lt;String, byte[]&amp;gt;(props);
ProducerRecord&amp;lt;String, byte[]&amp;gt; producerRecord = null;
producerRecord = new ProducerRecord&amp;lt;String, byte[]&amp;gt;(TOPIC, data.toByteArray());
//producerRecord = new ProducerRecord&amp;lt;String, byte[]&amp;gt;(TOPIC, “1”, data.toByteArray());
messageProducer.send(producerRecord);&lt;/PRE&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;Talend Job - Consumer:&lt;/P&gt; 
&lt;P&gt;Here is what the consumer job looks like, the tKafkaInputAvro has been updated with “Use hierarchical mode” enabled and disabled, regardless the setting the deserialization fails.&lt;SPAN class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="KafkaInputAvro1.png" style="width: 964px;"&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0683p000009LyKC.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/151906i7A55189832C9CEA6/image-size/large?v=v2&amp;amp;px=999" role="button" title="0683p000009LyKC.png" alt="0683p000009LyKC.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;tKafkaInputAvro Schema:&lt;SPAN class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="KafkaInputAvro3.png" style="width: 551px;"&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0683p000009Lv1m.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/151652i126C90DF2BEE94CD/image-size/large?v=v2&amp;amp;px=999" role="button" title="0683p000009Lv1m.png" alt="0683p000009Lv1m.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;When set to Use hierarchical mode:&lt;BR /&gt;&lt;SPAN class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="KafkaInputAvro2.png" style="width: 999px;"&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0683p000009LyW8.png"&gt;&lt;img src="https://community.qlik.com/t5/image/serverpage/image-id/155845iD56CE0B940FFE369/image-size/large?v=v2&amp;amp;px=999" role="button" title="0683p000009LyW8.png" alt="0683p000009LyW8.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;When the job runs and tries to deserialize the message we get the following exceptions (stack trace removed), see attached file for complete stack trace:&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;PRE&gt;Starting job ReadMessage_tKafkaInputAvro at 07:58 06/06/2018.
[statistics] connecting to socket on port 4016
[statistics] connected
[WARN ]: org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[WARN ]: org.apache.spark.SparkConf - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding enable.auto.commit to false for executor
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding auto.offset.reset to none for executor
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding executor group.id to spark-executor-example.KafkaInputAvroGroup1
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding receive.buffer.bytes to 65536 see KAFKA-3135
[WARN ]: org.apache.kafka.clients.consumer.ConsumerConfig - The configuration serializer.encoding = UTF-8 was supplied but isn't a known config.
[WARN ]: org.apache.kafka.clients.consumer.ConsumerConfig - The configuration serializer.encoding = UTF-8 was supplied but isn't a known config.
[ERROR]: org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition example.KafkaInputAvro-0 at offset 1
…
Caused by: java.lang.ArrayIndexOutOfBoundsException: -40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
…
at talend_proj.readmessage_tkafkainputavro_0_1.ReadMessage_tKafkaInputAvro$tKafkaInputAvro_1_KafkaAbstractInputAvro_ValueDeserializer.deserialize(ReadMessage_tKafkaInputAvro.java:423)
at talend_proj.readmessage_tkafkainputavro_0_1.ReadMessage_tKafkaInputAvro$tKafkaInputAvro_1_KafkaAbstractInputAvro_ValueDeserializer.deserialize(ReadMessage_tKafkaInputAvro.java:1)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:655)
... 41 more
[ERROR]: org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition example.KafkaInputAvro-0 at offset 1
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:665)
…&lt;/PRE&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt; 
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sat, 16 Nov 2024 08:08:40 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369917#M133019</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2024-11-16T08:08:40Z</dc:date>
    </item>
    <item>
      <title>Re: tKafkaInputAvro fails when deserializing Avro Message</title>
      <link>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369918#M133020</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Can you please clarify in which Talend version/edition you are?&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;Best regards&lt;/P&gt;
&lt;P&gt;Sabrina&lt;/P&gt;</description>
      <pubDate>Tue, 11 Sep 2018 13:57:27 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369918#M133020</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2018-09-11T13:57:27Z</dc:date>
    </item>
    <item>
      <title>Re: tKafkaInputAvro fails when deserializing Avro Message</title>
      <link>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369919#M133021</link>
      <description>&lt;P&gt;We are running Talend Studio 6.4.1&lt;/P&gt;</description>
      <pubDate>Tue, 11 Sep 2018 17:16:23 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369919#M133021</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2018-09-11T17:16:23Z</dc:date>
    </item>
    <item>
      <title>Re: tKafkaInputAvro fails when deserializing Avro Message</title>
      <link>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369920#M133022</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt; 
&lt;P&gt;Here exists a new feature jira issue about "Support for reading Kafka protobuf serialized hierarchical data".&lt;/P&gt; 
&lt;P&gt;&lt;A title="https://jira.talendforge.org/browse/TBD-7141" href="https://jira.talendforge.org/browse/TBD-7141" target="_self" rel="nofollow noopener noreferrer"&gt;https://jira.talendforge.org/browse/TBD-7141&lt;/A&gt;&lt;/P&gt; 
&lt;P&gt;It would be a new feature for Kafka Avro serialized hierarchical data as well.&lt;/P&gt; 
&lt;P&gt;With your subscription solution, have you already created a case on talend support portal?&lt;/P&gt; 
&lt;P&gt;Best regards&lt;/P&gt; 
&lt;P&gt;Sabrina&lt;/P&gt;</description>
      <pubDate>Wed, 12 Sep 2018 04:03:06 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/tKafkaInputAvro-fails-when-deserializing-Avro-Message/m-p/2369920#M133022</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2018-09-12T04:03:06Z</dc:date>
    </item>
  </channel>
</rss>

