Archie
Archie

Reputation: 982

Kafka Connect AWS S3 sink connector doesn't read from topic

I have a simple standalone S3 sink connector. Here is the relevant part of worker configuration properties:

plugin.path = <plugins directory>
bootstrap.servers = <List of servers on Amazon MKS>
security.protocol = SSL
...

It works fine when I connect it to a locally running Kafka. However when I connect it to a Kafka broker on AWS (with SSL), it doesn't consume anything. No errors, nothing. As if the topic was empty:

[2020-01-30 10:50:03,597] INFO Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:116)
[2020-01-30 10:50:03,598] INFO WorkerSinkTask{id=xxx} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)

When I enabled DEBUG mode in connect-log4j.properties, I started seeing lots of error messages:

Completed connection to node -2. Fetching API versions. (org.apache.kafka.clients.NetworkClient:914)
Initiating API versions fetch from node -2. (org.apache.kafka.clients.NetworkClient:928)
Connection with YYY disconnected (org.apache.kafka.common.network.Selector:607)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
...
Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:894)
Initialize connection to node XXX (id: -3 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1125)
Initiating connection to node XXX (id: -3 rack: null) using address XXX (org.apache.kafka.clients.NetworkClient:956)

Am I missing something with SSL configuration? Note that manually created org.apache.kafka.clients.consumer.KafkaConsumers can successfully read from this topic having only set "security.protocol = SSL".

EDIT: Here are the connector properties:

name = my-connector
connector.class = io.confluent.connect.s3.S3SinkConnector
topics = some_topic
timestamp.extractor = Record
locale = de_DE
timezone = UTC
storage.class = io.confluent.connect.s3.storage.S3Storage
partitioner.class = io.confluent.connect.storage.partitioner.HourlyPartitioner
format.class = io.confluent.connect.s3.format.bytearray.ByteArrayFormat
s3.bucket.name = some-s3-bucket
s3.compression.type = gzip
flush.size = 3
s3.region = eu-central-1

Upvotes: 1

Views: 1815

Answers (1)

haykart
haykart

Reputation: 957

I had a similar problem, which got solved after I have specified security protocol for consumer additionally (besides the global one): So just add

consumer.security.protocol = SSL

To the configuration properties

Upvotes: 2

Related Questions