Reputation: 1356
Newbie playing with Kafka and AVRO.
I am trying to deserialise AVRO messages in Python 3.7.3 using kafka-python
, avro-python3
packages and following this answer.
The function responsible for decoding the Kafka messages is
def decode_message(msg_value, reader):
from io import BytesIO
from avro.io import BinaryDecoder
message_bytes = BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
where reader
is defined as avro.io.DatumReader
instance:
def create_reader(filename_path):
from avro.io import DatumReader
import avro.schema
schema = avro.schema.Parse(open(filename_path).read())
reader = DatumReader(schema)
return reader
Unfortunately, it fails. This is the traceback:
<_io.BytesIO object at 0x7fab73fe5530>
<avro.io.BinaryDecoder object at 0x7fab74300090>
Traceback (most recent call last):
File "app.py", line 19, in <module>
kfk.read_messages(kafka_consumer, avro_reader)
File "/app/modules/consume_kafka.py", line 17, in read_messages
decoded_message = decode_message(msg_value, reader)
File "/app/modules/consume_kafka.py", line 50, in decode_message
event_dict = reader.read(decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 489, in read
return self.read_data(self.writer_schema, self.reader_schema, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 534, in read_data
return self.read_record(writer_schema, reader_schema, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 734, in read_record
field_val = self.read_data(field.type, readers_field.type, decoder)
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 512, in read_data
return decoder.read_utf8()
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 257, in read_utf8
input_bytes = self.read_bytes()
File "/usr/local/lib/python3.7/site-packages/avro/io.py", line 249, in read_bytes
assert (nbytes >= 0), nbytes
AssertionError: -40
I am able to read the message and it looks like
b'Obj\x01\x04\x14avro.codec\x08null\x16avro.schema\xbe\t{"type":"record","name":"tracks","namespace":"integration","fields":[{"name":"name","type":"string"},{"name":"data","type":[{"type":"record","name":"track_upload_verified","namespace":"integration.tracks","fields":[{"name":"track_id","type":"string"},{"name":"audio_filename","type":"string"},{"name":"track_type","type":"string"}]},{"type":"record","name":"audio_processed","namespace":"integration.tracks","fields":[{"name":"track_id","type":"string"},{"name":"audio_mp3_filename","type":"string"},{"name":"waveform_samples","type":{"type":"array","items":"int"}},{"name":"duration","type":"string"}]}]}]}\x00\xc4\x8ad\xceF\x9c\xef\x99\n}#y7\x96\xba\xb4\x02\xe2\x01*track_upload_verified\x00H341aa6a3-5ecb-4ac0-8f27-bc2fe5abc9d4^tracks-audio/-1khgyI4kYfSf8hq2XiXZjg-1569510465\x08main\xc4\x8ad\xceF\x9c\xef\x99\n}#y7\x96\xba\xb4'
which is what I expect, i.e. raw bites.
I am quite sure about the schema as I validated it using this tool.
Has anyone had a similar issue?
Upvotes: 15
Views: 1171
Reputation: 134
avro-python3
is now deprecated in favor of avro
, but I'd suggest using fastavro
instead:
import io
from fastavro import schemaless_reader, schemaless_writer
from kafka import KafkaConsumer, KafkaProducer
def serialize(dict_data: dict, servo_schema: dict) -> bytes:
"""
Serialize `dict_data` using `servo_schema` with Apache Avro.
"""
with io.BytesIO() as bytes_writer:
schemaless_writer(bytes_writer, servo_schema, dict_data)
serialized_data = bytes_writer.getvalue()
return serialized_data
def deserialize(serialized_data: bytes, servo_schema: dict) -> dict:
"""
Deserialize `serialized_data` using `servo_schema` with Apache Avro.
"""
with io.BytesIO() as bytes_writer:
bytes_writer.write(serialized_data)
bytes_writer.seek(0)
message = schemaless_reader(bytes_writer, servo_schema)
return message
# ...
serialized_message = serialize(message, schema)
# Send thought topic.
producer = KafkaProducer(bootstrap_servers=["localhost:1234"])
producer.send("my-topic", value=serialized_message)
producer.close()
# Read from topic.
consumer = KafkaConsumer("my-topic", bootstrap_servers=["localhost:1234"])
for message in consumer:
deserialized_message = deserialize(message.value, schema)
Upvotes: 1
Reputation: 361
Seems like a corruption within the avro file but its hard to understand based on your question and input.
Try the following:
Upvotes: 0