Reputation: 1921
I have a large csv and I want to write to a kafka topic.
def producer():
producer = KafkaProducer(bootstrap_servers='mykafka-broker')
with open('/home/antonis/repos/testfile.csv') as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
producer.send(topic='stable_topic', value=row)
producer.flush()
if __name__ == '__main__':
producer()
This code produces an error:
AssertionError: value must be bytes
The file looks like:
"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22
Can anyone help me with this?
Upvotes: 4
Views: 8371
Reputation: 39840
You need to properly serialise your values.
The following should do the trick:
import json
producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
Upvotes: 3