Reputation: 15152
I've installed latest (7.0.1) version of Confluent platform in standalone mode on Ubuntu virtual machine.
Using this sample Avro producer to generate stream from data to Kafka topic (pmu214).
Producer seems to work ok. I'll give full code on request. Producer output:
Raw data: {"pmu_id": 2, "time": 1644577001.22, "measurements": [{"stream_id": 2, "stat": "ok", "phasors": [[27.22379, 0.0], [24.672079638784002, -2.075237618663568], [25.11940552135938, 2.10660756475536], [3248.794237867336, -0.06468446412011757], [3068.6629010042793, -2.152472189017548], [2990.0809353594427, 2.031751749658583], [0.0, 0.0], [3101.9477751890026, -0.06193618455080409]], "analog": [], "digital": [0], "frequency": 50.022, "rocof": 0}]}
PMU record b'e5c1e5a8-3e44-465d-98c4-93f896ec1b14' successfully produced to pmu214 [0] at offset 38256
In ksqld
it seems that record reached ksqldb
ok:
rowtime: 2022/02/11 09:48:05.431 Z, key: [26151234-d3dd-4b7c-9222-2867@3486128305426751843/-], value: \x00\x00\x00\x00\x01\x04\xAA\xC3\xB1\xA0\x0C\x04\x04ok\xC2p\xD9A\x0F`\x
9F>.b\xC6A\xB9\xC8\xE2\xBF\xC5%\xC8A\x13v\x1A@\x8FVKE\xF9\xF8u>\xC5\xC5?E\xEA\xBA\xEC\xBF\xE0\xFA:E\xD5~\x15@\x00\x00\x00\x00\x00\x00\x00\x00\x84\x07BEs\xD1w>\x04[]\x020b\x0
2, partition: 0
Index 0 out of bounds for length 0
Topic printing ceased
Here is command used to connect to PostgreSQL:
bin/connect-standalone etc/kafka/connect-standalone.properties etc/schema-registry/connect-avro-standalone.properties
Here is content of connect-avro-standalone.properties
:
bootstrap.servers=localhost:9092
name=sinkIRIpostgre
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/mydb
topics=pmu214
connection.user=mydbuser
connection.password=mypass
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
pk.fields=MESSAGE_KEY
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
I didn't change anything in connect-standalone.properties
except plugins I've installed.
plugin.path=/home/proton/kafkaConnectors/confluentinc-kafka-connect-jdbc-10.3.2,/home/proton/kafkaConverters/confluentinc-kafka-connect-json-schema-converter-7.0.1,/home/proton/kafkaConverters/confluentinc-kafka-connect-avro-converter-7.0.1/
ERROR [sinkIRIpostgre|task-0] WorkerSinkTask{id=sinkIRIpostgre-0} Error converting message key in topic 'pmu214' partition 0 at offset 0 and timestamp 1644560570372: Failed to deserialize data for topic pmu214 to Avro: (org.apache.kafka.connect.runtime.WorkerSinkTask:552) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pmu214 to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:550) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
Upvotes: 0
Views: 959
Reputation: 191671
If you literally ran the Python sample code, then the key is not Avro, so a failure on the key.converter
would be expected, as shown
Error converting message key
Upvotes: 1