Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Join us to spark ideas for how to put the latest capabilities into action. Register here!
cancel
Showing results for 
Search instead for 
Did you mean: 
Anonymous
Not applicable

decoding Avro messages in a spark scala streaming code.

Hi, 

I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine.  Code part is below: 

import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010._ 

object Consumer4 { 
  def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
      .builder() 
      .appName("Kafka test") 
      .getOrCreate(); 

    val sparkContext = SparkContext.getOrCreate() 
    val streamingContext = new StreamingContext(sparkContext, Seconds(2)) 
    val preferredHosts = LocationStrategies.PreferConsistent 
    val topics = List("xyz") // 

    val kafkaParams = Map[String, Object]( 
      "bootstrap.servers" -> "xyz.xyz.com.tr:1111", 
      "key.deserializer" -> classOf[StringDeserializer], 
      "value.deserializer" -> classOf[StringDeserializer], 
      "group.id" -> "use_a_separate_group_id_for_each_stream", 
      "auto.offset.reset" -> "latest", 
      "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    val stream = KafkaUtils.createDirectStream[String, String]( 
      streamingContext, 
      PreferConsistent, 
      Subscribe[String, String](topics, kafkaParams) 
    ) 
    //    stream.map(record => (record.key, record.value)) 
    stream.foreachRDD { rdd => 
      //      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
      rdd.foreachPartition { iter => 

        while (iter.hasNext) { 
          val item = iter.next() 
          println(item) 
        } 

        //        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
        //        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
      } 
    } 
    streamingContext.start() 
    streamingContext.awaitTermination() 
  } 

Labels (4)
2 Replies
Anonymous
Not applicable
Author

There should be at least suggestions or advise etc..

Some help will be great and i will appreciate it 🙂 

Anonymous
Not applicable
Author

is the above code is being implemented through Talend ? problem statement is not clear, please explain ?