Unlock a world of possibilities! Login now and discover the exclusive benefits awaiting you.
Hi,
I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine. Code part is below:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
object Consumer4 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Kafka test")
.getOrCreate();
val sparkContext = SparkContext.getOrCreate()
val streamingContext = new StreamingContext(sparkContext, Seconds(2))
val preferredHosts = LocationStrategies.PreferConsistent
val topics = List("xyz") //
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "xyz.xyz.com.tr:1111",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// stream.map(record => (record.key, record.value))
stream.foreachRDD { rdd =>
// val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
while (iter.hasNext) {
val item = iter.next()
println(item)
}
// val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
// println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
There should be at least suggestions or advise etc..
Some help will be great and i will appreciate it 🙂
is the above code is being implemented through Talend ? problem statement is not clear, please explain ?