Vishnu
Vishnu

Reputation: 21

How to configure Apache Kafka File Sink Connector to write JSON data as pipe-separated string and rotate files hourly?

I'm working with Apache Kafka and need to configure the File Sink Connector to achieve the following:

I've explored the configuration options available like transforms, but I'm not sure how to accomplish these requirements. Here's what I need:

Kafka Topics would like this

{"key1":"value1","key2":"value2","key3":"value3","key4":"value4","key5":"value5","key6":"value6"}

Output should be as follows

value1|value2|value3|value4|value5|value6

Files should be rotated as below. file should be rotated every hour. output.txt_yyyyMMddhh

Any guidance or example configurations would be greatly appreciated!

Sample configurations for file sink connector.

name=file-sink-connector
connector.class=FileStreamSinkConnector
tasks.max=1
topics=my-topic
file=/path/output.txt

Upvotes: 2

Views: 118

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

If you want to convert JSON to pipe-delimited data, you'll need to write a custom transform or Converter subclass to do this.

If a custom converter is needed for the JSON transformation, how should it be implemented and integrated with the connector

Requires Java code to be packaged

package com.stackoverflow.example;

import org.apache.kafka.connect.storage.Converter;

public class CustomConverter implements Converter {
  // TODO: Implement
}

Then placed on the plugin.path directory of the Connect worker(s)

Then referenced as value.converter=com.stackoverflow.example.CustomConverter.

transforms would be similar with a different interface.


The built-in FileStreamSinkConnector does not support file rotation. The S3 Sink, for example, does, and you could use tools like minIO server to emulate a local s3 endpoint on your local filesystem

Upvotes: 1

Related Questions