Reputation: 886
We are going to deploy Apache Kafka 2.10 on our project and communicate via JSON objects between producer and consumer.
So far I suppose I need to:
Regarding first point, I believe it should be something like this:
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
Where T data
could be passed as string "{\"key\" : \"value\"}"
.
However by now there are problems with 2-4 points. I tried this in custom deserializer:
@Override
public JsonNode deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
JsonNode data;
try {
objectMapper = new ObjectMapper();
data = objectMapper.readTree(bytes);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
and in my Consumer I tried:
KafkaConsumer<String, TextNode> consumer = new KafkaConsumer<String, TextNode>(messageConsumer.properties);
consumer.subscribe(Arrays.asList(messageConsumer.topicName));
int i = 0;
while (true) {
ConsumerRecords<String, TextNode> records = consumer.poll(100);
for (ConsumerRecord<String, TextNode> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value().asText());
}
}
I thought this will produce a proper original json string, but all I got invoking record.value().asText()
was some hash string "IntcImtleVwiIDogXCJ2YWx1ZVwiIH0i"
.
Any advice or example of communicating via JSON in kafka would be greatly appreciated.
Upvotes: 1
Views: 17990
Reputation: 4024
I recommend you to use UTF-8 encoding as string JSON serializer:
1. Producer gets the data as a JSON string ("{\"key\" : \"value\"}"
)
2. Producer serialize the JSON string to bytes using UTF-8 (jsonString.getBytes(StandardCharsets.UTF_8);
)
3. Producer sends this bytes to Kafka
4. Consumer reading the bytes from Kafka
5. Consumer deserializing the bytes to JSON string using UTF-8 (new String(consumedByteArray, StandardCharsets.UTF_8);
)
6. Consumer doing whatever it needs to with the JSON string
I intentionally didn't use your code so the flow will be understandable, I think you can apply this example to your project very easily :)
Upvotes: 1
Reputation: 2313
There a built-in JsonSerializer with Apache Kafka. I'm not sure what 2.10 means for a version but the current version 0.10.0.1 definitely can do the serialization for you. Just look for the JsonSerializer class.
Upvotes: 0