Eric Bellet
Eric Bellet

Reputation: 2065

How to connect Apache Kafka with Amazon S3?

I want to store data from Kafka into a bucket s3 using Kafka Connect. I had already a Kafka's topic running and I had a bucket s3 created. My topic has data on Protobuffer, I tried with https://github.com/qubole/streamx and I obtained the next error:

 [2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
 [2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
 [2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
 [2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
 [2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
 java.lang.NullPointerException
    at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-10-04 13:35:47,194] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-04 13:35:51,235] INFO Reflections took 6844 ms to scan 259 urls, producing 13517 keys and 95788 values (org.reflections.Reflections:229)

I did the next steps:

  1. I cloned the repository.
  2. mvn DskipTests package
  3. nano config/connect-standalone.properties

    bootstrap.servers=ip-myip.ec2.internal:9092
    key.converter=com.qubole.streamx.ByteArrayConverter
    value.converter=com.qubole.streamx.ByteArrayConverter
    
  4. nano config/quickstart-s3.properties

    name=s3-sink 
    connector.class=com.qubole.streamx.s3.S3SinkConnector
    format.class=com.qubole.streamx.SourceFormat tasks.max=1
    topics=ssp.impressions
    flush.size=3
    s3.url=s3://myaccess_key:mysecret_key@mybucket/demo
    
  5. connect-standalone /etc/kafka/connect-standalone.properties quickstart-s3.properties

I would like to know if that I did is okay or another way to keep data into S3 from Kafka.

Upvotes: 9

Views: 15965

Answers (2)

Navin Kumar
Navin Kumar

Reputation: 160

Another way would be to write a consumer with log rotation and then corn files to S3 .

Upvotes: 0

Robin Moffatt
Robin Moffatt

Reputation: 32140

You can use Kafka Connect to do this integration, with the Kafka Connect S3 connector.

Kafka Connect is part of Apache Kafka, and the S3 connector is an open-source connector available either standalone or as part of Confluent Platform.

For general information and examples of Kafka Connect, this series of articles might help:

Disclaimer: I work for Confluent, and wrote the above blog articles.


April 2020: I have recorded a video showing how to use the S3 sink: https://rmoff.dev/kafka-s3-video

Upvotes: 15

Related Questions