Reputation: 97
I am writing a Kafka connector in order to download some data from several sources on Github (text and yaml files) and transform them into objects of a certain class, which is automatically generated from an avsc-file:
{
"type": "record",
"name": "MatomoRecord",
"fields": [
{"name": "name", "type": "string"},
{"name": "type", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
So far everything was successful. So now I have a Map of objects, which I want to persist in a Kafka topic. For that I'm trying to create SourceRecords:
for (Map.Entry<String, MatomoRecord> record : records.entrySet()) {
sourceRecords.add(new SourceRecord(
sourcePartition,
sourceOffset,
matomoTopic,
0,
org.apache.kafka.connect.data.Schema.STRING_SCHEMA,
record.getKey(),
matomoSchema,
record.getValue())
);
}
How can I define the value schema of type org.apache.kafka.connect.data.Schema based on the avro schema? For a test I have manually created a schema using the Builder:
Schema matomoSchema = SchemaBuilder.struct()
.name("MatomoRecord")
.field("name", Schema.STRING_SCHEMA)
.field("type", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA)
.build();
The result was:
org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class MatomoRecord
Could sombody help me to define the value schema based on the avro schema?
Best regards Martin
Upvotes: 2
Views: 1844
Reputation: 385
A KC Schema is an JSON schema that looks awfully like an Avro schema. Try org.apache.kafka.connect.json.JsonConverter#asConnectSchema
- you may need to massage the Avro schema to make it work.
Upvotes: 1
Reputation: 191738
You can't use record.getValue()
, nor is there a direct API from Avro to Connect Schema (without internal methods of Confluent's AvroConverter)
You need to parse that object into a Struct
object that matches the schema you've defined (which looks fine assuming none of your object fields can be null)
Look at the Javadoc for how you can define it https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html
Note (not relevant here), nested structs should be built from the "bottom up", where you put
child structs / arrays into parent ones.
Your connector should not necessarily depend on Avro other than to include your model objects. The Converter interfaces are responsible for converting your Struct with its Schema into other data formats (JSON, Confluent's Avro encoding, Protobuf, etc)
Upvotes: 2