Reputation: 333
I am trying to build an infrastructure in which I need to forward messages from one kafka topic to elasticsearch and postgresql. My infrastructure looks like in the picture below, and it all runs on the same host. Logstash is making some anonymization and some mutates, and sends the document back to kafka as json. Kafka should then forward the message to PostgreSQL and Elasticsearch
Everything works fine, accept the connection to postgresql, with which i'm having some trouble.
My config files looks like follows:
sink-quickstart-sqlite.properties
name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true
connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert
pk.mode=kafka
pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true
confluent-distributed.properties
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
quicstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false
The confluent-schema-registry service is running.
I'm getting the following error after curl http://localhost:8083/connectors/jdbc-sink/status | jq
{
"name": "jdbc-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.50.37:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "192.168.50.37:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
"
}
],
"type": "sink"
}
This looks like a message in my "processed" topic (the message in the topic is a one-liner, this is just formated):
{
"ROWTIME": 1587134287569,
"ROWKEY": "null",
"bytes": "4050",
"input": {
"type": "log"
},
"clientip": "156.226.170.95",
"@timestamp": "2020-04-17T14:38:06.346Z",
"timestamp": "17/Apr/2020:14:37:57 +0000",
"@version": "1",
"request": "/lists",
"ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
"httpversion": "1.1",
"auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
"agent": {
"ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
"version": "7.6.2",
"id": "50e21169-5aa0-496f-b792-3936e9c8de04",
"hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
"type": "filebeat"
},
"log": {
"offset": 707943,
"file": {
"path": "/home/ec2-user/log/apache.log"
}
},
"host": {
"name": "HOSTNAME_OF_MY_AWS_INSTANCE"
},
"verb": "DELETE",
"ecs": {
"version": "1.4.0"
},
"response": "503"
}
Please let me know if you need some more information.
Upvotes: 0
Views: 930
Reputation: 32090
Your error is here:
DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Since this is the JDBC Sink you must provide a schema to your data. If you have the option, I would suggest you use Avro. If not, you must structure your JSON data as required by Kafka Connect.
More info: https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s
Upvotes: 1