Reputation: 1
I'm writing a Kafka Streams application that transforms data received from a non-relational custom Kafka source connector and splits it into multiple topics to normalize it (so that it can be used by the JDBC sink connector afterwards). The data ingested has a schema that's saved in a self-hosted schema registry and is deserialized to a org.json.JSONObject
using the io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
.
After processing the stream, I have a JSONObject that i want to write to Kafka. I tried using io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
again to serialize with a schema, but I get the error WARN com.kjetland.jackson.jsonSchema.JsonSchemaGenerator - Not able to generate jsonSchema-info for type: [simple type, class org.json.JSONObject] - probably using custom serializer which does not override acceptJsonFormatVisitor
and an empty schema is created:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "JSON Object"
}
Here is the code where I write again to Kafka after processing the input stream:
// process primitive types (remove all non-primitive types in each record)
KStream<String, JSONObject> primitiveStream = inputStream.mapValues(value -> {
JSONObject inputObject = value;
for (String key : inputObject.keySet()) {
if (inputObject.get(key) instanceof JSONObject) {
logger.info("Key: '{}' removed, is an object.", key);
inputObject.remove(key);
} else if (inputObject.get(key) instanceof JSONArray) {
logger.info("Key: '{}' removed, is an array.", key);
inputObject.remove(key);
}
}
return inputObject;
});
KafkaJsonSchemaSerde<JSONObject> jsonobjserde = new KafkaJsonSchemaSerde(JSONObject.class);
Map<String, Object> jsonserdeConf = new HashMap<>();
jsonserdeConf.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
jsonobjserde.configure(jsonserdeConf, false);
primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), jsonobjserde));
I had trouble finding anything online that could help me with finding a solution. I thought about manually creating the schema and passing it along (like i do in my custom connector), but I couldn't find any way to do it with Kafka Streams. I also tried manually adding the schema to the schema registry and turning off schema autogeneration in the Kafka Streams application, but that just led to an error that the schema couldn't be found.
I would be very grateful for any help on this matter!
Upvotes: 0
Views: 31
Reputation: 6236
The error you're seeing suggests that the serializer isn't able to generate a schema for JSONObject
.
Create a Custom Serializer: Since JSONObject
is not directly supported by KafkaJsonSchemaSerde
, you can create a custom serializer that handles JSONObject and generates a schema for it3.
Use Jackson Annotations: If your JSONObject
represents a more structured data type, consider defining a Java class with Jackson annotations to represent the data. This way, KafkaJsonSchemaSerde
can generate a schema for the class.
Here's an example of how you can create a custom serializer for JSONObject
:
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONObject;
public class JSONObjectSerializer implements Serializer<JSONObject> {
private final KafkaJsonSchemaSerializer<JSONObject> jsonSchemaSerializer;
private final ObjectMapper objectMapper = new ObjectMapper();
public JSONObjectSerializer(String schemaRegistryUrl) {
jsonSchemaSerializer = new KafkaJsonSchemaSerializer<>(JSONObject.class);
Map<String, Object> config = new HashMap<>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
jsonSchemaSerializer.configure(config, false);
}
@Override
public byte[] serialize(String topic, JSONObject data) {
try {
return jsonSchemaSerializer.serialize(topic, objectMapper.convertValue(data, Map.class));
} catch (Exception e) {
throw new RuntimeException("Error serializing JSONObject", e);
}
}
}
In your Kafka Streams application, you can then use this custom serializer:
JSONObjectSerializer serializer = new JSONObjectSerializer(schemaRegistryUrl);
primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), new Serde<>(serializer)));
This approach should help you serialize the JSONObject
with a schema.
Upvotes: 0