Reputation: 14350
I have a topic titled newtest
in Kafka with three messages:
Hello
Is anybody out there
Can you hear me
...and I have the following config for a connect job:
{
"name":"connect-test-9",
"config":
{
"connector.class":"FileStreamSink",
"file":"connector-test",
"topics":"newtest",
"name":"connect-test-9",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"transforms":"Hoist, AddTimestamp",
"transforms.Hoist.type":"org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.Hoist.field":"line",
"transforms.AddTimestamp.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddTimestamp.timestamp.field":"Timestamp"
}
}
I'm getting the following output in file connector-test
:
Struct{line=Hello,Timestamp=Mon Mar 12 14:50:34 PDT 2018}
Struct{line=Is anybody out there,Timestamp=Mon Mar 12 14:50:44 PDT 2018}
Struct{line=Can you hear me,Timestamp=Mon Mar 12 14:50:52 PDT 2018}
I would like to get this:
{"line":"Hello","Timestamp":"Mon Mar 12 14:50:34 PDT 2018"}
{"line":"Is anybody out there","Timestamp":"Mon Mar 12 14:50:44 PDT 2018"}
{"line":"Can you hear me","Timestamp":"Mon Mar 12 14:50:52 PDT 2018"}
I've tried changing the value.converter, no good (parse exception). I also have another topic where the message is already Json, and the parse succeeds there, and I can add a Timestamp without Hoist. But my output is the same non-Json format {key1=value1,key2=value2}
.
Any way I can get the output in proper JSON?
This is the parse exception that I see:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Can': was expecting ('true', 'false', or 'null')
Upvotes: 1
Views: 11625
Reputation: 191743
To get JSON output, you need to use the JsonConverter
rather than the StringConverter
the converter happens before the sink, and after the consumer deserialization
Your data is already in Kafka, and you used a Source Connector, perhaps with a StringConveter
to ingest, then you convert to the internal Struct
, which can be setup with a Sink Connector and a different Converter type
Upvotes: 3