Luka Klarić
Luka Klarić

Reputation: 333

Forwarding messages from Kafka to Elasticsearch and Postgresql

I am trying to build an infrastructure in which I need to forward messages from one kafka topic to elasticsearch and postgresql. My infrastructure looks like in the picture below, and it all runs on the same host. Logstash is making some anonymization and some mutates, and sends the document back to kafka as json. Kafka should then forward the message to PostgreSQL and Elasticsearch

enter image description here

Everything works fine, accept the connection to postgresql, with which i'm having some trouble.

My config files looks like follows:

sink-quickstart-sqlite.properties

name=jdbc-test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#table.name.format=${topic}
topics=processed

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable:true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable:true

connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=luka
insert.mode=upsert

pk.mode=kafka

pk_fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=ident,auth,response,request,clientip
auto.create=true
auto.evolve=true

confluent-distributed.properties

group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

quicstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#topics=test-elasticsearch-sink
topics=processed
key.ignore=true
connection.url=http://localhost:9200
type.name=kafka-connect
schema.ignore=true
schemas.enable=false

The confluent-schema-registry service is running.

I'm getting the following error after curl http://localhost:8083/connectors/jdbc-sink/status | jq

{
  "name": "jdbc-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.50.37:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "192.168.50.37:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
                    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
                    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
                    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
                    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                    at java.base/java.lang.Thread.run(Thread.java:834)
                Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
                    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
                    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
                    ... 13 more
"
    }
  ],
  "type": "sink"
}

This looks like a message in my "processed" topic (the message in the topic is a one-liner, this is just formated):

{
    "ROWTIME": 1587134287569,
    "ROWKEY": "null",
    "bytes": "4050",
    "input": {
        "type": "log"
    },
    "clientip": "156.226.170.95",
    "@timestamp": "2020-04-17T14:38:06.346Z",
    "timestamp": "17/Apr/2020:14:37:57 +0000",
    "@version": "1",
    "request": "/lists",
    "ident": "536e605f097a92cb07c2a0a81f809f481c5af00d13305f0094057907792ce65e2a62b8ab8ba036f95a840504c3e2f05a",
    "httpversion": "1.1",
    "auth": "33a7f4a829adfaa60085eca1b84e0ae8f0aa2835d206ac765c22ad440e50d7ae462adafb13306aecfdcd6bd80b52b03f",
    "agent": {
        "ephemeral_id": "053b9c29-9038-4a89-a2b3-a5d8362460fe",
        "version": "7.6.2",
        "id": "50e21169-5aa0-496f-b792-3936e9c8de04",
        "hostname": "HOSTNAME_OF_MY_AWS_INSTANCE",
        "type": "filebeat"
    },
    "log": {
        "offset": 707943,
        "file": {
            "path": "/home/ec2-user/log/apache.log"
        }
    },
    "host": {
        "name": "HOSTNAME_OF_MY_AWS_INSTANCE"
    },
    "verb": "DELETE",
    "ecs": {
        "version": "1.4.0"
    },
    "response": "503"
}

Please let me know if you need some more information.

Upvotes: 0

Views: 930

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32090

Your error is here:

DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Since this is the JDBC Sink you must provide a schema to your data. If you have the option, I would suggest you use Avro. If not, you must structure your JSON data as required by Kafka Connect.

More info: https://www.youtube.com/watch?v=b-3qN_tlYR4&t=981s

Upvotes: 1

Related Questions