Eric Broda
Eric Broda

Reputation: 7231

Converting an arbitrary JSON string to Kafka Schema

I am using Kafka Connect successfully but I have a JSON string that I am trying to convert in a common/consistent fashion to a Kafka Schema. Is there a way to convert an arbitrary JSON string into a "SourceRecord" that can be consumed by Kafka Connect. I suspect there is a simple way to do this but so far I have not been able to find a good/working example that does what I am looking for.

Right now I can successfully convert the JSON string on a case-by-case basis as per the example as follows. The example shown is trivial, but I am hoping there a way to do this for any arbitrarily complex JSON string and create the Schema output (key and value) required by SourceRecord?

(Note: this is done in a Kafka Connect plugin "poll()" method)

:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;

records.add(new SourceRecord(
                    sourcePartition, sourceOffset, topic, partition,
                    keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id) {
    return new Struct(HttpSourceSchemas.KEY_SCHEMA)
                .put(HttpSourceSchemas.ID_FIELD, id);
}

private Struct buildValue(String timestamp, String data) {
    return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
                .put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
                .put(HttpSourceSchemas.DATA_FIELD, data);
}
:
:

My schema look like this:

public final class HttpSourceSchemas {

    private HttpSourceSchemas() {
        // Empty
    }

    public static final String ID_FIELD = "id";
    public static final String TIMESTAMP_FIELD = "timestamp";
    public static final String DATA_FIELD = "data";

    public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
        .name("Key Schema")
        .version(1)
        .field(ID_FIELD, Schema.INT64_SCHEMA)
        .build();

    public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
        .name("Value Schema")
        .version(1)
        .field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
        .field(DATA_FIELD, Schema.STRING_SCHEMA)
        .build();
}

Upvotes: 0

Views: 1812

Answers (1)

Wenli Wan
Wenli Wan

Reputation: 711

  1. Pick a JSON-processing library such as jackson (recommended, as this is included by connect-api dependency) or gson
  2. Parse the arbitrary string and get a JSON object in Java.
  3. Do a depth-first-search to traverse through the nodes of the JSON object.
  4. Build its Kafka schema.

Some code in Kafka's JsonConverter should help you.

Upvotes: 1

Related Questions