H S
H S

Reputation: 21

Kafka connect custom transforms to convert schema-less Json to Avro

I'm trying to build a system that reads json data(schema-less) from Kafka, converts it to avro and pushes it to s3.

I have been able to achieve the json to avro conversion using KStreams and KSQL. I was wondering if the same thing is possible using Kafka Connect's custom transforms.

This is what I have tried so far:

public class JsontoAvroConverter<R extends ConnectRecord<R>> implements Transformation<R> {

    public static final String OVERVIEW_DOC = "Transform Payload to Custom Format";
    private static final String PURPOSE = "transforming payload";
    public static final ConfigDef CONFIG_DEF = new ConfigDef();
    @Override
    public void configure(Map<String, ?> props) {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }

    @Override
    public R apply(R record) {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");

        avro_Schema updatedSchema = makeUpdatedSchema();

        return newRecord(record, updatedSchema);
    }

    private avro_Schema makeUpdatedSchema() {
        avro_Schema.Builder avro_record = avro_Schema.newBuilder()
                .setName("test")
                .setTry$(1);

        return avro_record.build();
    }

    protected Object operatingValue(R record) {
        return record.value();
    }

    protected R newRecord(R record, avro_Schema updatedSchema) {
        return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
    }
}

Where avro_schema is the name of my schema specified in an avsc file.

I am not sure if this is the right way to do it, but the problem I am facing is that when the newRecord() function is being called, it expects updatedSchema to be of Schema type, but I'm providing it a custom avro_Schema type.

Also, the avro_record.build() that i'm saving into updatedSchema is not really the schema but the transformed record, itself. But I cannot pass just the record topic, key(=null) and the updatedRecord to the newRecord function. It expects schema and values separately.

My questions are:

  1. Is it even possible to convert json to avro using KafkaConnect and without KStreams or KSQL? - because both alternatives require an independent service to be setup.
  2. How do I just pass a custom avro schema to the newRecord function and then provide the data separately.

My apologies if this has already been answered, I did go through some other questions but none of them seemed to answer my doubts. Let me know if you need any other details. Thank you!

Upvotes: 1

Views: 2219

Answers (1)

H S
H S

Reputation: 21

The KafkaConnect custom transformer only needs to add a schema to the incoming JSON. The sink property format.class=io.confluent.connect.s3.format.avro.AvroFormat will take care of the rest.

Without a schema, the record value is a Map and with a schema it becomes a struct. I had to modify my code as below:

    @Override
    public R apply(R record) {
        final Map<String,?> value = requireMap(record.value(),PURPOSE);
        Schema updatedSchema = makeUpdatedSchema();

        final Struct updatedValue = new Struct(updatedSchema);


        for (Field field : updatedSchema.fields()) {

            updatedValue.put(field.name(), value.get(field.name()));
        }


        return newRecord(record, updatedSchema, updatedValue);
    }

    private Schema makeUpdatedSchema() {
        final SchemaBuilder builder = SchemaBuilder.struct()
                .name("json_schema")
                .field("name",Schema.STRING_SCHEMA)
                .field("try",Schema.INT64_SCHEMA);

        return builder.build();
    }

Thanks @OneCricketeer for clarifying my doubts!

Upvotes: 0

Related Questions