Reputation: 3929
When I stream this with the console producer
{"id":1337,"status":"example_topic_1 success"}
I get this in from my filestream consumer
{id=1337, status=example_topic_1 success}
This is a major problem for me, because the original JSON message cannot be recovered without making assumptions about where the quotes used to be. How can I output the messages to a file, while preserving the quotation marks?
# sh bin/connect-standalone.sh \
> config/worker.properties \
> config/connect-file-sink-example_topic_1.properties
# sh bin/kafka-console-consumer.sh \
> --bootstrap-server kafka_broker:9092 \
> --topic example_topic_1
Finally, I start a console producer for sending messages, and I enter a message.
# sh bin/kafka-console-producer.sh \
> --broker-list kafka_broker:9092 \
> --topic example_topic_1
From the console consumer, the message pops out correctly, with quotes.
{"id":1337,"status":"example_topic_1 success"}
But I get this from my the FileStreamSink consumer:
{id=1337, status=example_topic_1 success}
offset.storage.file.filename=/tmp/example.offsets
bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1
Upvotes: 2
Views: 1139
Reputation: 558
I came across this question when I was trying to solve a similar issue, and while it is old, I thought to post my answer:
My solution involves creating my own implementation of a org.apache.kafka.connect.sink.SinkTask
. In the Sink
, where the Collection of org.apache.kafka.connect.sink.SinkRecord
is handled, I convert the Map
(that the JsonConverter
is outputting) using a jackson ObjectMapper
:
So the connect-standalone.properties contains this:
value.converter=org.apache.kafka.connect.json.JsonConverter
Then in my class that extends SinkTask:
for (SinkRecord record : records) {
Object recordValue = record.value();
if (recordValue instanceof Map) {
ObjectMapper objectMapper = new ObjectMapper();
try {
recordValue = objectMapper.writeValueAsString(recordValue);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
This means in your example:
{"id":1337,"status":"example_topic_1 success"}
is reformatted accordingly as a valid json string - and this means that you can still use the JsonConverter which means you can still process the JSON in the transforms. I am doing this using a org.apache.kafka.connect.transforms.ExtractField$Value
to extract a section of the json as the value.
Upvotes: 0
Reputation: 32090
Since you're not actually wanting to parse the JSON data, but just pass it straight through as a lump of text, you need to use the StringConverter:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
This article explains more about the nuances of converters: https://rmoff.net/2019/05/08/when-a-kafka-connect-converter-is-not-a-converter/. This shows an example of what you're trying to do, although uses kafkacat
in place of the console producer/consumer.
Upvotes: 5