Itzblend
Itzblend

Reputation: 129

Kafka: All messages failing in stream while data in topics

I have topic post_users_tand while use PRINT command on it I get

rowtime: 4/2/20 2:03:48 PM UTC, key: <null>, value: {"userid": 6, "id": 8, "title": "testest", "body": "Testingmoreand more"}
rowtime: 4/2/20 2:03:48 PM UTC, key: <null>, value: {"userid": 7, "id": 11, "title": "testest", "body": "Testingmoreand more"}

So then I create a stream out of this with:

CREATE STREAM userstream (userid INT, id INT, title VARCHAR, body VARCHAR)
    WITH (KAFKA_TOPIC='post_users_t',
          VALUE_FORMAT='JSON');

But I cant select anything from it and when I DESCRIBE EXTENDED it all the messages have failed.

consumer-messages-per-sec:      1.06 consumer-total-bytes:    116643 consumer-total-messages:      3417     last-message: 2020-04-02T14:08:08.546Z
consumer-failed-messages:      3417 consumer-failed-messages-per-sec:      1.06      last-failed: 2020-04-02T14:08:08.56Z

What am I doing wrong here?

Extra info under!

Print topic from beginning:

ksql> print 'post_users_t' from beginning limit 2;
Key format: SESSION(AVRO) or HOPPING(AVRO) or TUMBLING(AVRO) or AVRO or SESSION(PROTOBUF) or HOPPING(PROTOBUF) or TUMBLING(PROTOBUF) or PROTOBUF or SESSION(JSON) or HOPPING(JSON) or TUMBLING(JSON) or JSON or SESSION(JSON_SR) or HOPPING(JSON_SR) or TUMBLING(JSON_SR) or JSON_SR or SESSION(KAFKA_INT) or HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or KAFKA_INT or SESSION(KAFKA_BIGINT) or HOPPING(KAFKA_BIGINT) or TUMBLING(KAFKA_BIGINT) or KAFKA_BIGINT or SESSION(KAFKA_DOUBLE) or HOPPING(KAFKA_DOUBLE) or TUMBLING(KAFKA_DOUBLE) or KAFKA_DOUBLE or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 4/2/20 1:04:08 PM UTC, key: <null>, value: {"userid": 1, "id": 1, "title": "loremit", "body": "loremit heiluu ja paukkuu"}
rowtime: 4/2/20 1:04:08 PM UTC, key: <null>, value: {"userid": 2, "id": 2, "title": "lorbe", "body": "larboloilllaaa"}

Upvotes: 0

Views: 381

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32140

Per the output from ksqlDB's inspection of the topic, your data is serialised in Avro:

Value format: AVRO or KAFKA_STRING

but you have created the STREAM specifying VALUE_FORMAT='JSON'. This will result in deserialisation errors which if you run docker-compose logs -f ksqldb-server you'll see being written out when you try to query the stream.

Since you're using Avro, you don't need to specify the schema. Try this instead:

CREATE STREAM userstream 
   WITH (KAFKA_TOPIC='post_users_t',
         VALUE_FORMAT='AVRO');

Upvotes: 2

Related Questions