Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Qlik Open Lakehouse is Now Generally Available! Discover the key highlights and partner resources here.
cancel
Showing results for 
Search instead for 
Did you mean: 
_AnonymousUser
Specialist III
Specialist III

Issue running kafka with spark streaming

Hello,
I am running Talend Real Time for Big Data, and I have tried a simple spark streaming job to read messages from a kafka topic and write them into a file.
My job contains a tHDFSConfiguration, and a tKafkaInput linked to a tFileOutputJSON.
When I build the job and launch it in my MapR VM, I'm stuck with the following error which aborts the job : 
java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
        at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
        at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$8.apply(KafkaUtils.scala:411)
        at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$8.apply(KafkaUtils.scala:409)
        at scala.util.Either$RightProjection.flatMap(Either.scala:523)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at klf.test_kafka_0_1.test_kafka.tKafkaInput_1Process(test_kafka.java:539)
        at klf.test_kafka_0_1.test_kafka.run(test_kafka.java:910)
        at klf.test_kafka_0_1.test_kafka.runJobInTOS(test_kafka.java:863)
        at klf.test_kafka_0_1.test_kafka.main(test_kafka.java:744)

I have tried with no result to change my pom.xml settings in the project properties according the answer given here : .
Has anyone encountered this problem ? or any idea on how to solve it ?
Labels (5)
3 Replies
Anonymous
Not applicable

Hi,
Would you mind posting your setting screenshots into forum which will be helpful for us to address your issue?
Best regards
Sabrina
Anonymous
Not applicable

Please find below the screenshots of : the job design and job properties, and the tKafkaInput properties
0683p000009MBWU.jpg 0683p000009MBWZ.jpg
Anonymous
Not applicable

Did you download the mapr sandbox from talend website or mapr webesite?
What version of MapR are you using? 
Keep in mind MapR implementation of kafka different from other Hadoop vendors.The kafka input and output may not work. Support for  MapR streams is on roadmap
https://jira.talendforge.org/browse/TBD-3927