Reputation: 63
I am currently working on a Kafka project and my issue is that I am able to read a file with the file-source Connector and store the data into a topic.
My configuration :
connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor
Then I struggle to send the data into my Postgres Database with the Jdbc-sink Connector.
My configuration :
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=sensor
connection.url=jdbc:postgresql://localhost:5432/pg_data_eng
connection.user=vagrant
connection.password=vagrant
auto.create=true
key.converter=org.apache.kafka.connect.json.JsonConverter
schemas.enable=false
Note that I tried a bunch of different configurations and nothing worked. I can see my error using the REST API :
http://localhost:18083/connectors/jdbc-sink/tasks/0/status
And I get that :
{"id":0,"state":"FAILED","worker_id":"127.0.1.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more\n"}
I can see that the Value schema must be of type Struct
is the main issue and maybe concern the Schema-registry thing.
I also tried that by adding value.converter.schema.registry.url=http://localhost:8081
But still not working.
I did researched some tutorials on the internet but none of them about both file-source and jdbc-sink, so my question is : is it even possible to do that ?
Upvotes: 1
Views: 353
Reputation: 191738
The issue is that FileSourceConnect
returns a String schema Connect record, and not a Struct (which is what JDBC Sink and other connects usually expect).
You would have to use a transform to wrap the value into a Struct.
connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor
# Add this
transforms=HoistField
transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistField.field=line
Then with an input file of
Foo
Bar
After the transform, the messages would become this (please consume the topic manually to confirm)
{"line":"Foo"}
{"line":"Bar"}
So, your database would need a single line
Text column.
I also tried that by adding value.converter.schema.registry.url=http://localhost:8081 But still not working.
You would need to use Avro for this to work, not JSONConverter
Upvotes: 1