Chirag
Chirag

Reputation: 56

Confluent and Cassandra : Getting DataException: Failed to deserialize data to Avro, Unknown magic byte

I followed tutorial from http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/ and I am able to insert data from avro console to cassandra. Now I am trying to extend this to use flume and I have flume set up in my machine which will pick the log file and push it to kafka, trying to insert my data to cassandra database. In a text file I am putting data

{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2}

{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5}

{"id": 3, "created": "2016-05-06 13:55:00", "product": "FU-DATAMOUNTAINEER-20150201-100", "price": 10000}

{"id": 4, "created": "2016-05-06 13:56:00", "product": "FU-KOSPI-C-20150201-100", "price": 150}

Flume is picking that data and pushing it to kafka.

In cassandra sink, I am facing an error,

ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! [2016-09-28 15:47:00,951] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) [2016-09-28 15:47:00,951] INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79) [2016-09-28 15:47:00,952] INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:165)

Schema that I am using

 ./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'

Config for flume: Flume-kafka.conf.properties

agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink


agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576

agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.capacity = 1000

 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
 agent.sinks.kafkaSink.topic = orders-topic
 agent.sinks.kafkaSink.brokerList = localhost:9092
 agent.sinks.kafkaSink.channel = memoryChannel
 agent.sinks.kafkaSink.batchSize = 20

Can anyone please help me, how to fix this error?

Upvotes: 1

Views: 1316

Answers (1)

dawsaw
dawsaw

Reputation: 2313

Generally if you have an unknown magic byte it means your client and server versions for Kafka are incompatible. Check to make sure your version of the Cassandra sink has been built with Kafka client libraries at a version less than or equal to your brokers.

Upvotes: 0

Related Questions