Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
See why IDC MarketScape names Qlik a 2025 Leader! Read more
cancel
Showing results for 
Search instead for 
Did you mean: 
rnathan2020
Contributor
Contributor

Spark Streaming - Kafka client fails with "Couldn't find leaders for Set [topic,2])"

 Using Talend 6.5.1

I have a simple Spark Streaming job that retrieves a message from kafka and passes to tHMapRecord to tFileOutputDelimited. 

With or without the tHMapRecord, the job is failing with strange error message: 

 

org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leaders for Set([cd072_fb,2])
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)

The cd072_fb is a topic. I have googled for it but none of the suggested answers is satisfactory. 

 

I notice that the tKafkaInput is different for normal jobs and Spark Streaming. In Spark Streaming, there is no prompt for "group id". So I used Advanced Properties to set it:

 

No problems in the edge node (java or kafka console consumer) and the studio ( Talend standard job).

Any ideas would be appreciated.

Labels (3)
2 Replies
Anonymous
Not applicable

Hello,

What's Kafka broker version are you using? It looks like the topic does not exist.  Did you use tKafkaCreateTopic in your Talend standard job beforehand.?

Make sure that this topic has been properly created, you may check that within zookeeper either by browsing the znodes using Exhibitor (if installed on your environment) or using the command line.

Best regards

sABRINA

Anonymous
Not applicable

Hello,

About the differences between DI and Spark Streaming regarding Kafka components, please note these points :

In DI, we're using the Kafka API for both input and output. 
In Spark Streaming, the Kafka input is provided by a connector developed by Spark. A lot of things are delegated to this connector, allowing us to get an input DStream from a Kafka topic, so we're relying on an API provided by Spark on top of Kafka.

Kafka provides two types of consumers : a "High level consumer" which is easier to implement (it manages offsets, keeps track of partitions leaders while handling leader failures / leader re-election and many things by itself) and a "Simple consumer" which allows to customize behaviour to a very low level (almost everything is under the reponsibility of the developer).
These two consumers require different parameters to run, that's why you could see that some consumers need a broker list whereas some others actually need a zookeeper quorum list. In Spark Streaming, the connector is implemented using the "Simple consumer" so Spark is managing offsets by itself and we don't have control on that : Spark actually does not commit any offset to zookeeper or Kafka, it keeps track of the offsets within its checkpoints.
You can read some details about this connector on the Spark documentation (we're using the "Direct approach") : http://https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
If you really want to set the consumer group, you should be able to do that by providing the "group.id" property on the tKafkaInput advanced properties tab. Remember that Spark Streaming does not rely on zookeeper to track offsets.
Best regards

Sabrina