Do not input private or sensitive data. View Qlik Privacy & Cookie Policy.
Skip to main content

Announcements
Qlik and ServiceNow Partner to Bring Trusted Enterprise Context into AI-Powered Workflows. Learn More!
cancel
Showing results for 
Search instead for 
Did you mean: 
adbdkb
Creator
Creator

Kafka Target endpoint - Separate topic for each table

When setting up the Kafka as target endpoint and selecting Separate topic for each table - how do we define the topic names?  Is there a pattern that needs to be followed for that, as it doesn't ask for any pattern or list of topics

 

adbdkb_0-1651330007728.png

 

Thank you

 

13 Replies
SwathiPulagam
Support
Support

@adbdkb 

Here is the process to follow to create different topics per each table.

https://help.qlik.com/en-US/replicate/November2021/Content/Replicate/Main/Kafka/kafka_override_defau...

A transformation can be defined that overrides the topic, partition and message key

settings defined in the General tab.

Note Before you can define such a transformation, you first need to add a source

endpoint to the task and select the tables you want to replicate.

To define a transformation:

1. Open the task you defined.

2. If you are defining a transformation for a single table, select one of the source

tables. Otherwise, skip to Step 3.

3. Define a Global transformation that adds one of the following columns:

Note The columns listed below (prefixed with a $) instruct Replicate to route

the message to the desired topic and/or partition, and will not be included in the actual message itself

 

$topic - To write messages to a specific topic.

$partition - To write messages to a specific partition.

$key - To create a custom message key.

 

4. Define an expression for the new column that returns the following values:

For a $topic column, the expression should return the topic name.

For a $partition column, the expression should return the partition number.

Note that an error will be returned during runtime if the partition number does not exist.

For a $key column, the expression should return the message key contents.

Note that the expression must return a non-empty value.

Example:

 

SwathiPulagam_0-1651361449858.png

Thanks,

Swathi

adbdkb
Creator
Creator
Author

Thank you @SwathiPulagam .   I have a couple of questions - some of the answers to my questions are getting automatically accepted without me doing so.  Why is that happening?

Other question is - when I have a question related to the answer,  when I post that question after the answer gets accepted by someone else, does the question get looked at?

 

When I tried to create the topic function as explained above,  do I ned to defined all the 3 variables - topic, partition and key or defining the topic is enough.  Also, I presume that I must have the topics ready and the table will automatically select the correct topic.

  1. The reason I have these questions now, is that after defining the topic variable as below, if the topic doesn't exist, I get the error below.  I was expecting will say the full topic name that would be exited using the transformation function.

Failed to produce kafka message with record id <2> to partition <-1> in topic '__Qlik_Topic'. Broker: Unknown topic or partition

adbdkb_0-1651414229983.png

 

Thanks

SwathiPulagam
Support
Support

@adbdkb Anyone in the community can accept the solution if they feel it is appropriate and also any question posted on the accepted solution can be answered as long as it is relevant to the same subject.

The below three are different pre-defined variables. In your case, you need to just use $topic to route messages generated by different tables to go to a different topic.

$topic - To write messages on a specific topic.

$partition - To write messages to a specific partition.

$key - To create a custom message key.

 

Yes, before you create this transformation either topics with the same name exist or the user you specified in the replicate Kafka endpoint must have access to create the topic.

 

Thanks,

Swathi

 

adbdkb
Creator
Creator
Author

Ok, Thanks @SwathiPulagam .  I am ok with the answer being accepted as long as questions related to the original question are looked at 

So, in my case, if it shows this error, not the schema and table prepended - 

Failed to produce kafka message with record id <2> to partition <-1> in topic '__Qlik_Topic'. Broker: Unknown topic or partition

 

This is how I have defined the topic name

Column name: $topic Column data type: STRING(50) Computation expression: $AR_M_SOURCE_SCHEMA||"_"||$AR_M_SOURCE_TABLE_NAME||"_Qlik_Topic"

 

Thanks

What am I missing in my definition of the topic ?

Steve_Nguyen
Support
Support

@adbdkb

 

1. i assume that this is a fresh task, try to set target_load ,, verbose. to see more detail of the error.

 

2. for your transformation: what topic are you writing to?  because : $topic - To write messages on a specific topic. 

Help users find answers! Don't forget to mark a solution that worked for you! If already marked, give it a thumbs up!
avidary_qlik
Support
Support

Hi @adbdkb 

Can you please check file name: C:\kafka_xxx\config\server.properties

search for : auto.create.topics.enable=true

check if this line exist in the file.

if not

Add this line  save the erver.properties file

re start Kafka ->  run your replicate task- Reload target

auto.create.topics.enable=true

 

This should do the work.

 

Thank you

Avidar

adbdkb
Creator
Creator
Author

>> 2. for your transformation: what topic are you writing to?  because : $topic - To write messages on a specific topic. 

Yes, I am trying to write to $topic.  I first tried with single table -> single topic for testing.  That worked.

Next thing I am trying is - multiple tables to -> individual topics and for that I have defined the $topic to be 

$AR_M_SOURCE_SCHEMA||"_"||$AR_M_SOURCE_TABLE_NAME||"_Qlik_Topic"

As it shows here

Column name: $topic Column data type: STRING(50) Computation expression: $AR_M_SOURCE_SCHEMA||"_"||$AR_M_SOURCE_TABLE_NAME||"_Qlik_Topic"
 
My question is - is the above definition valid for a topic name to be associated with the table this way.  I realize that if the id doesn't have a create topic enabled, I will have to pre-create them.  But because the error didn't prepend the schema name and the table name, I wanted to verify that my definition is correct.
 
Thanks
 
adbdkb
Creator
Creator
Author

Hi, any idea on the definition?  Is it a valid definition for associating topic name with a table?

 

Column name: $topic Column data type: STRING(50) Computation expression: $AR_M_SOURCE_SCHEMA||"_"||$AR_M_SOURCE_TABLE_NAME||"_Qlik_Topic"

 

avidary_qlik
Support
Support

Hello @adbdkb 

Replicate provide you with 2 options:

  • Kafka topic names cannot exceed 255 characters (249 from Kafka 0.10) and can only contain the following characters: a-z|A-Z|0-9|. (dot)|_(underscore)|-(minus)

  • Keep the topic name as the name of the table name.
  • Use Transformation to create your own topic name,

(like Add column
for %.%
Column '$topic',
Value: "your_prefferd_name" || $AR_M_SOURCE_TABLE_NAME || "your prefferd sufix")

 

Thank you

Avidar