Sanjay Nayak
Sanjay Nayak

Reputation: 89

Consumer_failed_message in kafka stream: Records not pushed from topic

I have a flow where from IBM mainframe IIDR, I am sending records to Kafka topic. The value_format of the message coming to Kafka topic is AVRO and the key is in AVRO format too. The records are pushed into the Kafka topic. I have a stream associated with that topic. But the records are not passed into the stream. Example of the test_iidr topic -

rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

The value_format in the stream is AVRO and the column names are all checked.

The stream creation query -

CREATE STREAM test_iidr (
  col1 STRING, 
  col2 DECIMAL(2,0),
  col3 DECIMAL(1,0),
  iidr_tran_type STRING,
  iidr_a_ccid STRING,
  iidr_a_user STRING,
  iidr_src_upd_ts STRING,
  iidr_a_member STRING) 
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');

Is it failing to load into the stream from the topic as the KEY is not mentioned in WITH statement? The schema registry has the test_iidr-value and test_iidr-key subjects registered in it.

The key.converter and value.converter in the Kafka-connect docker is set as - org.apache.kafka.connect.json.JsonConverter. Is this JsonConverter creating this issue?

I created a completely different pipeline with different stream and inserted the same data manually using insert into statements. It worked. Only the IIDR flow is not working and the records are not pushed into the stream from the topic.

I am using Confluent kafka version 5.5.0.

Upvotes: 0

Views: 156

Answers (1)

Andrew Coates
Andrew Coates

Reputation: 1893

The JsonConverter in the connect config could well be converting your Avro data to JSON.

To determine the key and value serialization formats you can use the PRINT command, (which I can see you've already run). PRINT will output the key and value formats when it runs. For example:

ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

So the first thing to check is the formats output for the key and value by PRINT and then update your CREATE statement accordingly.

Note, ksqlDB does not yet support Avro/Json keys, so you may want/need to repartition your data, see: https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-to-do-if-your-key-is-not-set-or-is-in-a-different-format

Side note: If the schema for the value is stored in the Schema Registry, then you don't need to define the columns in your CREATE statement as ksqlDB will load the columns from the Schema Registry

Side note: you don't need PARTITIONS=1, REPLICAS=3 in the WITH clause for existing topics, only if you want ksqlDB to create the topic for you.

Upvotes: 1

Related Questions