griboedov
griboedov

Reputation: 886

Producing and consuming JSON in KAFKA

We are going to deploy Apache Kafka 2.10 on our project and communicate via JSON objects between producer and consumer.

So far I suppose I need to:

  1. Implement a custom serializer to convert JSON into byte array
  2. implement a custom deserializer to convert byte array into JSON object
  3. Produce the message
  4. Read the message in Consumer class

Regarding first point, I believe it should be something like this:

@Override
public byte[] serialize(String topic, T data) {
    if (data == null)
        return null;
    try {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsBytes(data);
    } catch (Exception e) {
        throw new SerializationException("Error serializing JSON message", e);
    }
}

Where T data could be passed as string "{\"key\" : \"value\"}".

However by now there are problems with 2-4 points. I tried this in custom deserializer:

@Override
public JsonNode deserialize(String topic, byte[] bytes) {
    if (bytes == null)
        return null;

    JsonNode data;
    try {
        objectMapper = new ObjectMapper();
        data = objectMapper.readTree(bytes);
    } catch (Exception e) {
        throw new SerializationException(e);
    }
    return data;
}

and in my Consumer I tried:

    KafkaConsumer<String, TextNode> consumer = new KafkaConsumer<String, TextNode>(messageConsumer.properties);
    consumer.subscribe(Arrays.asList(messageConsumer.topicName));
    int i = 0;
    while (true) {
        ConsumerRecords<String, TextNode> records = consumer.poll(100);
        for (ConsumerRecord<String, TextNode> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value().asText());
        }
    }

I thought this will produce a proper original json string, but all I got invoking record.value().asText() was some hash string "IntcImtleVwiIDogXCJ2YWx1ZVwiIH0i".

Any advice or example of communicating via JSON in kafka would be greatly appreciated.

Upvotes: 1

Views: 17990

Answers (2)

Ofek Hod
Ofek Hod

Reputation: 4024

I recommend you to use UTF-8 encoding as string JSON serializer:
1. Producer gets the data as a JSON string ("{\"key\" : \"value\"}")
2. Producer serialize the JSON string to bytes using UTF-8 (jsonString.getBytes(StandardCharsets.UTF_8);)
3. Producer sends this bytes to Kafka 4. Consumer reading the bytes from Kafka 5. Consumer deserializing the bytes to JSON string using UTF-8 (new String(consumedByteArray, StandardCharsets.UTF_8);)
6. Consumer doing whatever it needs to with the JSON string

I intentionally didn't use your code so the flow will be understandable, I think you can apply this example to your project very easily :)

Upvotes: 1

dawsaw
dawsaw

Reputation: 2313

There a built-in JsonSerializer with Apache Kafka. I'm not sure what 2.10 means for a version but the current version 0.10.0.1 definitely can do the serialization for you. Just look for the JsonSerializer class.

Upvotes: 0

Related Questions