srikanth
srikanth

Reputation: 978

Error while pushing JSON data from Kafka using Kafka connect to InfluxDB

I'm trying to push JSON data from Kafka to InfluxDB using Kafka-connect. Kafka version- 2.2.0, using Kafka-connect came along the apache-kafka.

Worker properties:

bootstrap.servers=localhost:9092
rest.port=8083
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets1
offset.storage.replication.factor=1
offset.storage.partitions=50
config.storage.topic=connect-configs1
config.storage.replication.factor=1
status.storage.topic=connect-status1
status.storage.replication.factor=1
status.storage.partitions=10
offset.storage.file.filename=/data/md0/kafka/data/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
consumer.max.poll.records=10000

On starting the REST curl command and data push, logs are throwing the following exception:

[2019-11-14 09:41:33,294] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
        at io.confluent.influxdb.sink.InfluxDBSinkTask.writeRecordsToDB(InfluxDBSinkTask.java:230)
        at io.confluent.influxdb.sink.InfluxDBSinkTask.put(InfluxDBSinkTask.java:112)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-11-14 09:41:33,295] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
        at io.confluent.influxdb.sink.InfluxDBSinkTask.writeRecordsToDB(InfluxDBSinkTask.java:230)
        at io.confluent.influxdb.sink.InfluxDBSinkTask.put(InfluxDBSinkTask.java:112)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        ... 10 more
[2019-11-14 09:41:33,295] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

Upvotes: 0

Views: 1480

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191748

The fact that the error says it wants to cast to org.apache.kafka.connect.data.Struct, then you need schemas as part of that data, not plain JSON.

Therefore, you can either

  1. producer Avro data to the topic, and use the AvroConverter
  2. update your producer to use {"schema": ..., "payload": ... } formatted JSON with value.converter.schemas.enable=true
  3. find another InfluxDB sink that does support plain JSON.

Upvotes: 1

Related Questions