<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic decoding Avro messages in a spark scala streaming code. in Talend Studio</title>
    <link>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321188#M91248</link>
    <description>&lt;P&gt;&lt;SPAN&gt;Hi,&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine. &amp;nbsp;Code part is below:&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.kafka.common.serialization.StringDeserializer&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark._&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.sql.SparkSession&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming._&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming.kafka010._&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;object Consumer4 {&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; def main(args: Array[String]): Unit = {&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val spark = SparkSession&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .builder()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .appName("Kafka test")&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .getOrCreate();&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val sparkContext = SparkContext.getOrCreate()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val streamingContext = new StreamingContext(sparkContext, Seconds(2))&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val preferredHosts = LocationStrategies.PreferConsistent&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val topics = List("xyz") //&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val kafkaParams = Map[String, Object](&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "bootstrap.servers" -&amp;gt; "xyz.xyz.com.tr:1111",&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "key.deserializer" -&amp;gt; classOf[StringDeserializer],&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "value.deserializer" -&amp;gt; classOf[StringDeserializer],&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "group.id" -&amp;gt; "use_a_separate_group_id_for_each_stream",&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "auto.offset.reset" -&amp;gt; "latest",&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "enable.auto.commit" -&amp;gt; (false: java.lang.Boolean)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val stream = KafkaUtils.createDirectStream[String, String](&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; streamingContext,&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; PreferConsistent,&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; Subscribe[String, String](topics, kafkaParams)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp;stream.map(record =&amp;gt; (record.key, record.value))&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; stream.foreachRDD { rdd =&amp;gt;&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp; &amp;nbsp;val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; rdd.foreachPartition { iter =&amp;gt;&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; while (iter.hasNext) {&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val item = iter.next()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; println(item)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; streamingContext.start()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; streamingContext.awaitTermination()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;}&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Sat, 16 Nov 2024 08:32:07 GMT</pubDate>
    <dc:creator>Anonymous</dc:creator>
    <dc:date>2024-11-16T08:32:07Z</dc:date>
    <item>
      <title>decoding Avro messages in a spark scala streaming code.</title>
      <link>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321188#M91248</link>
      <description>&lt;P&gt;&lt;SPAN&gt;Hi,&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine. &amp;nbsp;Code part is below:&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.kafka.common.serialization.StringDeserializer&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark._&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.sql.SparkSession&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming._&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;import org.apache.spark.streaming.kafka010._&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;object Consumer4 {&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; def main(args: Array[String]): Unit = {&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val spark = SparkSession&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .builder()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .appName("Kafka test")&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; .getOrCreate();&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val sparkContext = SparkContext.getOrCreate()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val streamingContext = new StreamingContext(sparkContext, Seconds(2))&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val preferredHosts = LocationStrategies.PreferConsistent&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val topics = List("xyz") //&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val kafkaParams = Map[String, Object](&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "bootstrap.servers" -&amp;gt; "xyz.xyz.com.tr:1111",&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "key.deserializer" -&amp;gt; classOf[StringDeserializer],&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "value.deserializer" -&amp;gt; classOf[StringDeserializer],&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "group.id" -&amp;gt; "use_a_separate_group_id_for_each_stream",&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "auto.offset.reset" -&amp;gt; "latest",&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; "enable.auto.commit" -&amp;gt; (false: java.lang.Boolean)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; val stream = KafkaUtils.createDirectStream[String, String](&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; streamingContext,&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; PreferConsistent,&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; Subscribe[String, String](topics, kafkaParams)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp;stream.map(record =&amp;gt; (record.key, record.value))&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; stream.foreachRDD { rdd =&amp;gt;&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp; &amp;nbsp;val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; rdd.foreachPartition { iter =&amp;gt;&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; while (iter.hasNext) {&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; val item = iter.next()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; println(item)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; // &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; streamingContext.start()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; streamingContext.awaitTermination()&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; }&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;}&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 16 Nov 2024 08:32:07 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321188#M91248</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2024-11-16T08:32:07Z</dc:date>
    </item>
    <item>
      <title>Re: decoding Avro messages in a spark scala streaming code.</title>
      <link>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321189#M91249</link>
      <description>&lt;P&gt;There should be at least suggestions or advise etc..&lt;/P&gt;&lt;P&gt;Some help will be great and i will appreciate it &lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 20 Mar 2018 07:47:43 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321189#M91249</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2018-03-20T07:47:43Z</dc:date>
    </item>
    <item>
      <title>Re: decoding Avro messages in a spark scala streaming code.</title>
      <link>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321190#M91250</link>
      <description>&lt;P&gt;is the above code is being implemented through Talend ? problem statement is not clear, please explain ?&lt;/P&gt;</description>
      <pubDate>Mon, 26 Mar 2018 02:57:03 GMT</pubDate>
      <guid>https://community.qlik.com/t5/Talend-Studio/decoding-Avro-messages-in-a-spark-scala-streaming-code/m-p/2321190#M91250</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2018-03-26T02:57:03Z</dc:date>
    </item>
  </channel>
</rss>

