Skip to main content
Announcements
Introducing a new Enhanced File Management feature in Qlik Cloud! GET THE DETAILS!
cancel
Showing results for 
Search instead for 
Did you mean: 
Alexist
Contributor II
Contributor II

Kafka in Talend 8.0.1

Hi everyone,

I have a problem with Kafka in Talend 8.0.1 . In simple words, I succed to lauch the Zookeeper service and the kafka service which are on ubuntu2004 . I also succed to connect myself to my brooker in Talend and create topics.

The problem is, using this same tKafkaConnection, I don't succed to send a message in bytes.

Here the flow in question :

0695b00000WtNPmAAN.png 

With this job, Like I said I succed to connect to the broker to create a topics but not to send a message in bytes to my topic.

Here is the flow after I click on run : 

0695b00000WtNVFAA3.png 

And the error message :

[INFO ] 17:11:41 org.apache.kafka.common.utils.AppInfoParser- Kafka version : 1.1.0

[INFO ] 17:11:41 org.apache.kafka.common.utils.AppInfoParser- Kafka commitId : fdcf75ea326b8e07 [INFO ] 17:11:41 sandbox.kafkatopic_0_1.KafkaTopic- tFileInputDelimited_1 - Retrieving records from the datasource.

[INFO ] 17:11:41 sandbox.kafkatopic_0_1.KafkaTopic- tLogRow_2 - Content of row 1: test d'envois de message dans kafka

[INFO ] 17:11:41 sandbox.kafkatopic_0_1.KafkaTopic- tLogRow_1 - Content of row 1: test d'envois de message dans kafka

[WARN ] 17:11:43 org.apache.kafka.clients.NetworkClient- [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.

 

I use this Kafka version : kafka_2.13-3-2-1

For the records, in the tKafkaConnection, I select the version 1.1.0 because with the newest version of kafka in this comp, I didn't even succed to create a topic :

0695b00000WtNVeAAN.png 

Would you be able to help me please ?

On a second time I tried to implement a SSL/TLS security. I am having issues with this too.

Hope you could have some int for me with this

Best regards

Labels (3)
7 Replies
Anonymous
Not applicable

Sorry it has taken me a while to reply @Alexis Taine​. I have been working through recreating this environment as close as I can and have managed to get it working. First of all, I used the Kafka 2.4.X version. I believe you need to use this one.

 

Why have you got two tKafkaCreateTopic components?

 

When you tried to create the Topic, did it fail because the Topic already exists?

Alexist
Contributor II
Contributor II
Author

Hi rhall, thank you for your answer.

When I used the 2.4.X version, I can't even create a topic.

 

But with the version 1.1.0 as I showed, I succed to create my 2 topics.

No, the error is not at this level. Plus, I put my tKafkaCreateTopic on "Create if don't exist".

 

Finaly, on my flux like this, I succed to connect on the first hand with the tKafkaConnection, I create my 2 topics, then the error is when I try to send a message in bytes.

Anonymous
Not applicable

If you are using Kafka 2.13-3.2.1 then 2.4.X should be working. I am using it. So I am not sure why this isn't working for you. However, since 1.1.0 is, then maybe you can continue with that.

 

Regarding your message error, I have found this which may help.....

https://stackoverflow.com/questions/56161345/connection-to-node-1-could-not-be-established-broker-may-not-be-available-or

 

It looks like it could be an issue with your config for Kafka.

 

However, it could be that you are not converting your String data to a bit array before sending it. This can be easily done using code like this....

 

row2.serializedValue = row1.serializedValue.getBytes();

 

The above is taken from a tJavaFlex I used to receive a String value and output a bit array.

Alexist
Contributor II
Contributor II
Author

Hi rhall,

thank you for your time. I test and retest everything you suggest me.

Sadly that didn't solve my problem.

With the stackoverflow post I still can connect to my broker then create topics but not be able to send messages.

 

Also, my message is well and trully in Bytes.

 

I'll keep looking and if I find a solution I'll post it here.

Anonymous
Not applicable

Are you converting your Strings from the file to bytes or are they already in bytes? Bytes read as Strings may cause you issues as well. The column type being passed to the Kafka component must be byte[] (or byte array).

Alexist
Contributor II
Contributor II
Author

Yes, my data pass from String to Bytes properly thanks to : row4.serializedValue.getBytes(); in my tMap .

 

I also tried with :

String example = input_row.serializedValue;

byte[] serializedValue = example.getBytes();

 

output_row.serializedValue = serializedValue;

 

In a tJavaRow.

 

Sadly, always the same error.

Anonymous
Not applicable

I'm afraid I cannot recreate this issue. As I said, I have a bog standard Kafka configuration and am able to use the 2.4.X Kafka version. I suspect that the problem here is with your Kafka configuration and you may need to identify how this is configured before you can solve this issue.