Reputation: 56
I followed tutorial from http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/ and I am able to insert data from avro console to cassandra. Now I am trying to extend this to use flume and I have flume set up in my machine which will pick the log file and push it to kafka, trying to insert my data to cassandra database. In a text file I am putting data
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2}
{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5}
{"id": 3, "created": "2016-05-06 13:55:00", "product": "FU-DATAMOUNTAINEER-20150201-100", "price": 10000}
{"id": 4, "created": "2016-05-06 13:56:00", "product": "FU-KOSPI-C-20150201-100", "price": 150}
Flume is picking that data and pushing it to kafka.
In cassandra sink, I am facing an error,
ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! [2016-09-28 15:47:00,951] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) [2016-09-28 15:47:00,951] INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79) [2016-09-28 15:47:00,952] INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:165)
Schema that I am using
./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
Config for flume: Flume-kafka.conf.properties
agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576
agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = orders-topic
agent.sinks.kafkaSink.brokerList = localhost:9092
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.batchSize = 20
Can anyone please help me, how to fix this error?
Upvotes: 1
Views: 1316
Reputation: 2313
Generally if you have an unknown magic byte it means your client and server versions for Kafka are incompatible. Check to make sure your version of the Cassandra sink has been built with Kafka client libraries at a version less than or equal to your brokers.
Upvotes: 0