Reputation: 89
I have a flow where from IBM mainframe IIDR, I am sending records to Kafka topic. The value_format
of the message coming to Kafka topic is AVRO and the key is in AVRO format too. The records are pushed into the Kafka topic. I have a stream associated with that topic. But the records are not passed into the stream.
Example of the test_iidr
topic -
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
The value_format in the stream is AVRO and the column names are all checked.
The stream creation query -
CREATE STREAM test_iidr (
col1 STRING,
col2 DECIMAL(2,0),
col3 DECIMAL(1,0),
iidr_tran_type STRING,
iidr_a_ccid STRING,
iidr_a_user STRING,
iidr_src_upd_ts STRING,
iidr_a_member STRING)
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');
Is it failing to load into the stream from the topic as the KEY
is not mentioned in WITH
statement?
The schema registry has the test_iidr-value
and test_iidr-key
subjects registered in it.
The key.converter
and value.converter
in the Kafka-connect
docker is set as - org.apache.kafka.connect.json.JsonConverter
. Is this JsonConverter
creating this issue?
I created a completely different pipeline with different stream and inserted the same data manually using insert into
statements. It worked. Only the IIDR flow is not working and the records are not pushed into the stream from the topic.
I am using Confluent kafka version 5.5.0.
Upvotes: 0
Views: 156
Reputation: 1893
The JsonConverter
in the connect config could well be converting your Avro data to JSON.
To determine the key and value serialization formats you can use the PRINT
command, (which I can see you've already run). PRINT
will output the key and value formats when it runs. For example:
ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}
So the first thing to check is the formats output for the key and value by PRINT and then update your CREATE
statement accordingly.
Note, ksqlDB does not yet support Avro/Json keys, so you may want/need to repartition your data, see: https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-to-do-if-your-key-is-not-set-or-is-in-a-different-format
Side note: If the schema for the value is stored in the Schema Registry, then you don't need to define the columns in your CREATE statement as ksqlDB will load the columns from the Schema Registry
Side note: you don't need PARTITIONS=1, REPLICAS=3
in the WITH
clause for existing topics, only if you want ksqlDB to create the topic for you.
Upvotes: 1