Skip to main content
Announcements
Qlik Connect 2024! Seize endless possibilities! LEARN MORE

Generate a record to attrep_apply_exceptions topic for Kafka endpoint

100% helpful (1/1)
cancel
Showing results for 
Search instead for 
Did you mean: 
KellyHobson
Support
Support

Generate a record to attrep_apply_exceptions topic for Kafka endpoint

Last Update:

Jan 21, 2022 4:37:18 AM

Updated By:

Sonja_Bauernfeind

Created date:

Jan 14, 2022 10:38:30 AM

Introduction:

 

This post is meant to be used for testing whether attrep_apply_exceptions is writing correctly to kafka target endpoint. It will cause a task warning, so please use this for testing carefully.

When using the kafka target endpoint, a topic named attrep_apply_exceptions needs to exist to capture potential errors in the task. Users can either create a topic named attrep_apply_exceptions before starting the replication task or configure the brokers with auto.create.topics.enable=true in config/server.properties.

 

 

Steps:

 

  1. Create a CDC only (apply change processing on) test task with your same source and kafka as a target
  2. Select a table and add a transformation column to the Output side with the following:
    Name
    $partition
    Type
    NUMERIC(18,0)
    Fx
    11​

     

    1.png

  3. Save the task
  4. Start processing
  5. Once it is running, insert a record or two on the source side table.

    2.png

  6. Data error will be generated and data will be written to exceptions topic.

    3.png

 

Output on kafka server:

[root@replicate2 bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
attrep_apply_exceptions
productkels
[root@replicate2 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic attrep_apply_exceptions --from-beginning
{"magic": "atMSG", "type": "DT", "headers": null, "messageSchemaId": null, "messageSchema": null, "message": {"data": {"TASK_NAME": "clean_kafka_exceptions_test", "TABLE_OWNER": "", "TABLE_NAME": "productkels:11", "ERROR_TIME": "2022-01-14 15:11:40.700", "STATEMENT": "{\"magic\": \"atMSG\", \"type\": \"DT\", \"headers\": null, \"messageSchemaId\": null, \"messageSchema\": null, \"message\": {\"data\": {\"Employee_id\": 366, \"employee_first_name\": \"ty\", \"Employee_last_name\": \"mann\"}, \"beforeData\": null, \"headers\": {\"operation\": \"INSERT\", \"changeSequence\": \"20220114151140000000000000000000005\", \"timestamp\": \"2022-01-14T15:11:40.000\", \"streamPosition\": \"binlog.000014:15304:0:15354:60129557307:binlog.000014:15230\", \"transactionId\": \"00000000000000000000000E00003B3B\", \"changeMask\": \"07\", \"columnMask\": \"07\", \"transactionEventCounter\": 1, \"transactionLastEvent\": true}}}", "ERROR": "INVALID_PARTITION:Local: Unknown partition"}, "beforeData": null, "headers": {"operation": "INSERT", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "transactionEventCounter": null, "transactionLastEvent": null}}}
^CProcessed a total of 1 messages

 

Explanation:

Kafka defaults to a partition value of 1, but this can be confirmed with this command.

[root@replicate2 bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic yourtopicname

Since we set the transform column to 11, Replicate throws an INVALID PARTITION error since the partition does not exist.

 

Environment

The information in this article is provided as-is and to be used at own discretion. Depending on tool(s) used, customization(s), and/or other factors ongoing support on the solution below may not be provided by Qlik Support.

 

Version history
Last update:
‎2022-01-21 04:37 AM
Updated by: