Lee Yee Run
Lee Yee Run

Reputation: 87

Kafka: Message does not start with magic byte

I tried to fetch my data from my processed topic using KSQL. However, it is not working.

I set up a table called api_table using KSQL. Here are the details of my table.

ksql> show topics;

 Kafka Topic   | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-------------------------------------------------------------------------------------------
 _schemas      | false      | 1          | 1                  | 0         | 0              
 api_log       | true       | 1          | 1                  | 1         | 1              
 API_STREAM    | false      | 1          | 1                  | 0         | 0              
 API_STREAM_KEY| true       | 1          | 1                  | 1         | 1              
 API_TABLE     | true       | 1          | 1                  | 0         | 0              
 mysql-config  | false      | 1          | 1                  | 0         | 0              
 mysql-offsets | false      | 25         | 1                  | 0         | 0              
 mysql-status  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------

And this my table format.

ksql> describe extended bv_table;

Name                 : API_TABLE
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : API_TABLE (partitions: 1, replication: 1)

 Field      | Type                      
----------------------------------------
 ROWTIME    | BIGINT           (system) 
 ROWKEY     | VARCHAR(STRING)  (system) 
 KSQL_COL_0 | BIGINT                    
 COUNT      | BIGINT                    
----------------------------------------

Queries that write into this TABLE
-----------------------------------
CTAS_API_TABLE_2 : CREATE TABLE API_TABLE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'API_TABLE') AS SELECT
  WINDOWSTART() "KSQL_COL_0"
, COUNT(*) "COUNT"
FROM API_STREAM_KEY API_STREAM_KEY
WINDOW TUMBLING ( SIZE 5 MINUTES ) 
GROUP BY API_STREAM_KEY.METRIC;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.10   total-messages:       249     last-message: 2019-08-13T07:07:39.325Z

(Statistics of the local KSQL server interaction with the Kafka topic API_TABLE)

Everything is working fine, and I even can print out the message.

However, if I try to use python to consume the message.

from confluent_kafka import KafkaError
import io
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


consumer = AvroConsumer({
    'bootstrap.servers': 'localhost:9021',
    'schema.registry.url': 'http://localhost:8081',
    'group.id': 'abcd'
})

consumer.subscribe(['API_TABLE'])

while True:
    try:
        msg = consumer.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

consumer.close()

It shows this error. Why????

Traceback (most recent call last):
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 149, in poll
    decoded_key = self._serializer.decode_message(message.key(), is_key=True)
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 225, in decode_message
    raise SerializerError("message does not start with magic byte")
confluent_kafka.avro.serializer.SerializerError: message does not start with magic byte

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/ylee/PycharmProjects/Bokeh/consumer.py", line 18, in <module>
    msg = consumer.poll(10)
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 156, in poll
    e))
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at API_TABLE [0] offset 110: message does not start with magic byte

Upvotes: 1

Views: 8753

Answers (3)

user3237183
user3237183

Reputation:

Confluent AVRO doesn't support the type bytes? So the format cannot be read by some libraries that assume it.

https://github.com/confluentinc/ksql/issues/1282 you may solve the issue using Confluent libraries to encode / decode https://github.com/confluentinc/confluent-kafka-python

Upvotes: -1

Lee Yee Run
Lee Yee Run

Reputation: 87

I tried to solve it myself. I modified a little bit to suit my needs. Here is a workaround.

import io
import struct

from avro.io import BinaryDecoder, DatumReader
from confluent_kafka import Consumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import SerializerError

# Please adjust your server and url

# KAFKA BROKER URL
consumer = Consumer({
    'bootstrap.servers': 'localhost:9021',
    'group.id': 'abcd'
})

# SCHEMA URL 
register_client = CachedSchemaRegistryClient(url="http://localhost:8081")
consumer.subscribe(['YOUR TOPIC'])

MAGIC_BYTES = 0


def unpack(payload):
    magic, schema_id = struct.unpack('>bi', payload[:5])

    # Get Schema registry
    # Avro value format
    if magic == MAGIC_BYTES:
        schema = register_client.get_by_id(schema_id)
        reader = DatumReader(schema)
        output = BinaryDecoder(io.BytesIO(payload[5:]))
        abc = reader.read(output)
        return abc
    # String key
    else:
        # If KSQL payload, exclude timestamp which is inside the key. 
        # payload[:-8].decode()
        return payload.decode()


def get_data():
    while True:
        try:
            msg = consumer.poll(10)
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            raise SerializerError

        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            return

        key, value = unpack(msg.key()), unpack(msg.value())
        print(key, value)

if __name__ == '__main__':
    get_data()

For more detail why it happens, can read it on my blog

Upvotes: 3

Robin Moffatt
Robin Moffatt

Reputation: 32090

The problem is that whilst KSQL writes the value as Avro, the key is STRING.

This issue looks to be the same, with a proposed PR to fix: https://github.com/confluentinc/confluent-kafka-python/pull/650

Upvotes: 1

Related Questions