Reputation: 982
I have a simple standalone S3 sink connector. Here is the relevant part of worker configuration properties:
plugin.path = <plugins directory>
bootstrap.servers = <List of servers on Amazon MKS>
security.protocol = SSL
...
It works fine when I connect it to a locally running Kafka. However when I connect it to a Kafka broker on AWS (with SSL), it doesn't consume anything. No errors, nothing. As if the topic was empty:
[2020-01-30 10:50:03,597] INFO Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:116)
[2020-01-30 10:50:03,598] INFO WorkerSinkTask{id=xxx} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:302)
When I enabled DEBUG mode in connect-log4j.properties, I started seeing lots of error messages:
Completed connection to node -2. Fetching API versions. (org.apache.kafka.clients.NetworkClient:914)
Initiating API versions fetch from node -2. (org.apache.kafka.clients.NetworkClient:928)
Connection with YYY disconnected (org.apache.kafka.common.network.Selector:607)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
...
Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:894)
Initialize connection to node XXX (id: -3 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1125)
Initiating connection to node XXX (id: -3 rack: null) using address XXX (org.apache.kafka.clients.NetworkClient:956)
Am I missing something with SSL configuration? Note that manually created org.apache.kafka.clients.consumer.KafkaConsumer
s can successfully read from this topic having only set "security.protocol = SSL".
EDIT: Here are the connector properties:
name = my-connector
connector.class = io.confluent.connect.s3.S3SinkConnector
topics = some_topic
timestamp.extractor = Record
locale = de_DE
timezone = UTC
storage.class = io.confluent.connect.s3.storage.S3Storage
partitioner.class = io.confluent.connect.storage.partitioner.HourlyPartitioner
format.class = io.confluent.connect.s3.format.bytearray.ByteArrayFormat
s3.bucket.name = some-s3-bucket
s3.compression.type = gzip
flush.size = 3
s3.region = eu-central-1
Upvotes: 1
Views: 1815
Reputation: 957
I had a similar problem, which got solved after I have specified security protocol for consumer additionally (besides the global one): So just add
consumer.security.protocol = SSL
To the configuration properties
Upvotes: 2