Reputation: 7231
I am using Kafka Connect successfully but I have a JSON string that I am trying to convert in a common/consistent fashion to a Kafka Schema. Is there a way to convert an arbitrary JSON string into a "SourceRecord" that can be consumed by Kafka Connect. I suspect there is a simple way to do this but so far I have not been able to find a good/working example that does what I am looking for.
Right now I can successfully convert the JSON string on a case-by-case basis as per the example as follows. The example shown is trivial, but I am hoping there a way to do this for any arbitrarily complex JSON string and create the Schema output (key and value) required by SourceRecord?
(Note: this is done in a Kafka Connect plugin "poll()" method)
:
:
long l = generateId();
Long id = Long.valueOf(l);
Object key = buildKey(id);
Schema keySchema = HttpSourceSchemas.KEY_SCHEMA;
Object value = buildValue(timestampStr, "hello world");
Schema valueSchema = HttpSourceSchemas.VALUE_SCHEMA;
records.add(new SourceRecord(
sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchema, value));
:
:
private Struct buildKey(Long id) {
return new Struct(HttpSourceSchemas.KEY_SCHEMA)
.put(HttpSourceSchemas.ID_FIELD, id);
}
private Struct buildValue(String timestamp, String data) {
return new Struct(HttpSourceSchemas.VALUE_SCHEMA)
.put(HttpSourceSchemas.TIMESTAMP_FIELD, timestamp)
.put(HttpSourceSchemas.DATA_FIELD, data);
}
:
:
My schema look like this:
public final class HttpSourceSchemas {
private HttpSourceSchemas() {
// Empty
}
public static final String ID_FIELD = "id";
public static final String TIMESTAMP_FIELD = "timestamp";
public static final String DATA_FIELD = "data";
public static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name("Key Schema")
.version(1)
.field(ID_FIELD, Schema.INT64_SCHEMA)
.build();
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name("Value Schema")
.version(1)
.field(TIMESTAMP_FIELD, Schema.STRING_SCHEMA)
.field(DATA_FIELD, Schema.STRING_SCHEMA)
.build();
}
Upvotes: 0
Views: 1812
Reputation: 711
connect-api
dependency) or gsonSome code in Kafka's JsonConverter should help you.
Upvotes: 1