Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Join us in Toronto Sept 9th for Qlik's AI Reality Tour! Register Now
cancel
Showing results for 
Search instead for 
Did you mean: 
Anonymous
Not applicable

decoding Avro messages in a spark scala streaming code.

Hi, 

I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine.  Code part is below: 

import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010._ 

object Consumer4 { 
  def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
      .builder() 
      .appName("Kafka test") 
      .getOrCreate(); 

    val sparkContext = SparkContext.getOrCreate() 
    val streamingContext = new StreamingContext(sparkContext, Seconds(2)) 
    val preferredHosts = LocationStrategies.PreferConsistent 
    val topics = List("xyz") // 

    val kafkaParams = Map[String, Object]( 
      "bootstrap.servers" -> "xyz.xyz.com.tr:1111", 
      "key.deserializer" -> classOf[StringDeserializer], 
      "value.deserializer" -> classOf[StringDeserializer], 
      "group.id" -> "use_a_separate_group_id_for_each_stream", 
      "auto.offset.reset" -> "latest", 
      "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    val stream = KafkaUtils.createDirectStream[String, String]( 
      streamingContext, 
      PreferConsistent, 
      Subscribe[String, String](topics, kafkaParams) 
    ) 
    //    stream.map(record => (record.key, record.value)) 
    stream.foreachRDD { rdd => 
      //      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
      rdd.foreachPartition { iter => 

        while (iter.hasNext) { 
          val item = iter.next() 
          println(item) 
        } 

        //        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
        //        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
      } 
    } 
    streamingContext.start() 
    streamingContext.awaitTermination() 
  } 

Labels (4)
2 Replies
Anonymous
Not applicable
Author

There should be at least suggestions or advise etc..

Some help will be great and i will appreciate it 🙂 

Anonymous
Not applicable
Author

is the above code is being implemented through Talend ? problem statement is not clear, please explain ?