Leah Ili
Leah Ili

Reputation: 1

Kafka Streams creates empty schema from JSONObject

I'm writing a Kafka Streams application that transforms data received from a non-relational custom Kafka source connector and splits it into multiple topics to normalize it (so that it can be used by the JDBC sink connector afterwards). The data ingested has a schema that's saved in a self-hosted schema registry and is deserialized to a org.json.JSONObject using the io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde.

After processing the stream, I have a JSONObject that i want to write to Kafka. I tried using io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde again to serialize with a schema, but I get the error WARN com.kjetland.jackson.jsonSchema.JsonSchemaGenerator - Not able to generate jsonSchema-info for type: [simple type, class org.json.JSONObject] - probably using custom serializer which does not override acceptJsonFormatVisitor and an empty schema is created:

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "JSON Object"
}

Here is the code where I write again to Kafka after processing the input stream:

// process primitive types (remove all non-primitive types in each record)
KStream<String, JSONObject> primitiveStream = inputStream.mapValues(value -> {
    JSONObject inputObject = value;
    for (String key : inputObject.keySet()) {
        if (inputObject.get(key) instanceof JSONObject) {
            logger.info("Key: '{}' removed, is an object.", key);
            inputObject.remove(key);
        } else if (inputObject.get(key) instanceof JSONArray) {
            logger.info("Key: '{}' removed, is an array.", key);
            inputObject.remove(key);
        }
    }
    return inputObject;
});

KafkaJsonSchemaSerde<JSONObject> jsonobjserde = new KafkaJsonSchemaSerde(JSONObject.class);
Map<String, Object> jsonserdeConf = new HashMap<>();
jsonserdeConf.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
jsonobjserde.configure(jsonserdeConf, false);

primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), jsonobjserde));

I had trouble finding anything online that could help me with finding a solution. I thought about manually creating the schema and passing it along (like i do in my custom connector), but I couldn't find any way to do it with Kafka Streams. I also tried manually adding the schema to the schema registry and turning off schema autogeneration in the Kafka Streams application, but that just led to an error that the schema couldn't be found.

I would be very grateful for any help on this matter!

Upvotes: 0

Views: 31

Answers (1)

Ajay Takur
Ajay Takur

Reputation: 6236

The error you're seeing suggests that the serializer isn't able to generate a schema for JSONObject.

Create a Custom Serializer: Since JSONObject is not directly supported by KafkaJsonSchemaSerde, you can create a custom serializer that handles JSONObject and generates a schema for it3.

Use Jackson Annotations: If your JSONObject represents a more structured data type, consider defining a Java class with Jackson annotations to represent the data. This way, KafkaJsonSchemaSerde can generate a schema for the class.

Here's an example of how you can create a custom serializer for JSONObject:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONObject;

public class JSONObjectSerializer implements Serializer<JSONObject> {
    private final KafkaJsonSchemaSerializer<JSONObject> jsonSchemaSerializer;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public JSONObjectSerializer(String schemaRegistryUrl) {
        jsonSchemaSerializer = new KafkaJsonSchemaSerializer<>(JSONObject.class);
        Map<String, Object> config = new HashMap<>();
        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        jsonSchemaSerializer.configure(config, false);
    }

    @Override
    public byte[] serialize(String topic, JSONObject data) {
        try {
            return jsonSchemaSerializer.serialize(topic, objectMapper.convertValue(data, Map.class));
        } catch (Exception e) {
            throw new RuntimeException("Error serializing JSONObject", e);
        }
    }
}

In your Kafka Streams application, you can then use this custom serializer:

JSONObjectSerializer serializer = new JSONObjectSerializer(schemaRegistryUrl);
primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), new Serde<>(serializer)));

This approach should help you serialize the JSONObject with a schema.

Upvotes: 0

Related Questions