Reputation: 21
I'm working with Apache Kafka and need to configure the File Sink Connector to achieve the following:
Convert JSON data to pipe-separated strings: The Kafka topic contains messages in JSON format, and I need the connector to write these messages to a file in a pipe-separated format.
File rotation: The connector should rotate the files every hour.
I've explored the configuration options available like transforms, but I'm not sure how to accomplish these requirements. Here's what I need:
Kafka Topics would like this
{"key1":"value1","key2":"value2","key3":"value3","key4":"value4","key5":"value5","key6":"value6"}
Output should be as follows
value1|value2|value3|value4|value5|value6
Files should be rotated as below. file should be rotated every hour. output.txt_yyyyMMddhh
Any guidance or example configurations would be greatly appreciated!
Sample configurations for file sink connector.
name=file-sink-connector
connector.class=FileStreamSinkConnector
tasks.max=1
topics=my-topic
file=/path/output.txt
Upvotes: 2
Views: 118
Reputation: 191743
If you want to convert JSON to pipe-delimited data, you'll need to write a custom transform or Converter subclass to do this.
If a custom converter is needed for the JSON transformation, how should it be implemented and integrated with the connector
Requires Java code to be packaged
package com.stackoverflow.example;
import org.apache.kafka.connect.storage.Converter;
public class CustomConverter implements Converter {
// TODO: Implement
}
Then placed on the plugin.path
directory of the Connect worker(s)
Then referenced as value.converter=com.stackoverflow.example.CustomConverter
.
transforms
would be similar with a different interface.
The built-in FileStreamSinkConnector does not support file rotation. The S3 Sink, for example, does, and you could use tools like minIO server to emulate a local s3 endpoint on your local filesystem
Upvotes: 1