Reputation: 87
I tried to fetch my data from my processed topic using KSQL. However, it is not working.
I set up a table called api_table using KSQL. Here are the details of my table.
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
api_log | true | 1 | 1 | 1 | 1
API_STREAM | false | 1 | 1 | 0 | 0
API_STREAM_KEY| true | 1 | 1 | 1 | 1
API_TABLE | true | 1 | 1 | 0 | 0
mysql-config | false | 1 | 1 | 0 | 0
mysql-offsets | false | 25 | 1 | 0 | 0
mysql-status | false | 5 | 1 | 0 | 0
-------------------------------------------------------------------------------------------
And this my table format.
ksql> describe extended bv_table;
Name : API_TABLE
Type : TABLE
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : API_TABLE (partitions: 1, replication: 1)
Field | Type
----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | BIGINT
COUNT | BIGINT
----------------------------------------
Queries that write into this TABLE
-----------------------------------
CTAS_API_TABLE_2 : CREATE TABLE API_TABLE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'API_TABLE') AS SELECT
WINDOWSTART() "KSQL_COL_0"
, COUNT(*) "COUNT"
FROM API_STREAM_KEY API_STREAM_KEY
WINDOW TUMBLING ( SIZE 5 MINUTES )
GROUP BY API_STREAM_KEY.METRIC;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.10 total-messages: 249 last-message: 2019-08-13T07:07:39.325Z
(Statistics of the local KSQL server interaction with the Kafka topic API_TABLE)
Everything is working fine, and I even can print out the message.
However, if I try to use python to consume the message.
from confluent_kafka import KafkaError
import io
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9021',
'schema.registry.url': 'http://localhost:8081',
'group.id': 'abcd'
})
consumer.subscribe(['API_TABLE'])
while True:
try:
msg = consumer.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
consumer.close()
It shows this error. Why????
Traceback (most recent call last):
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 149, in poll
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 225, in decode_message
raise SerializerError("message does not start with magic byte")
confluent_kafka.avro.serializer.SerializerError: message does not start with magic byte
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/ylee/PycharmProjects/Bokeh/consumer.py", line 18, in <module>
msg = consumer.poll(10)
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 156, in poll
e))
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at API_TABLE [0] offset 110: message does not start with magic byte
Upvotes: 1
Views: 8753
Reputation:
Confluent AVRO doesn't support the type bytes? So the format cannot be read by some libraries that assume it.
https://github.com/confluentinc/ksql/issues/1282 you may solve the issue using Confluent libraries to encode / decode https://github.com/confluentinc/confluent-kafka-python
Upvotes: -1
Reputation: 87
I tried to solve it myself. I modified a little bit to suit my needs. Here is a workaround.
import io
import struct
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka import Consumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import SerializerError
# Please adjust your server and url
# KAFKA BROKER URL
consumer = Consumer({
'bootstrap.servers': 'localhost:9021',
'group.id': 'abcd'
})
# SCHEMA URL
register_client = CachedSchemaRegistryClient(url="http://localhost:8081")
consumer.subscribe(['YOUR TOPIC'])
MAGIC_BYTES = 0
def unpack(payload):
magic, schema_id = struct.unpack('>bi', payload[:5])
# Get Schema registry
# Avro value format
if magic == MAGIC_BYTES:
schema = register_client.get_by_id(schema_id)
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
abc = reader.read(output)
return abc
# String key
else:
# If KSQL payload, exclude timestamp which is inside the key.
# payload[:-8].decode()
return payload.decode()
def get_data():
while True:
try:
msg = consumer.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
raise SerializerError
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
return
key, value = unpack(msg.key()), unpack(msg.value())
print(key, value)
if __name__ == '__main__':
get_data()
For more detail why it happens, can read it on my blog
Upvotes: 3
Reputation: 32090
The problem is that whilst KSQL writes the value as Avro, the key is STRING
.
This issue looks to be the same, with a proposed PR to fix: https://github.com/confluentinc/confluent-kafka-python/pull/650
Upvotes: 1