biomartin
biomartin

Reputation: 97

Building a new SourceRecord from an Object

I am writing a Kafka connector in order to download some data from several sources on Github (text and yaml files) and transform them into objects of a certain class, which is automatically generated from an avsc-file:

{
  "type": "record",
  "name": "MatomoRecord",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "type", "type": "string"},
    {"name": "timestamp", "type": "long"}
  ]
}

So far everything was successful. So now I have a Map of objects, which I want to persist in a Kafka topic. For that I'm trying to create SourceRecords:

for (Map.Entry<String, MatomoRecord> record : records.entrySet()) {
  sourceRecords.add(new SourceRecord(
    sourcePartition,
    sourceOffset,
    matomoTopic,
    0,
    org.apache.kafka.connect.data.Schema.STRING_SCHEMA,
    record.getKey(),
    matomoSchema,
    record.getValue())
  );
}

How can I define the value schema of type org.apache.kafka.connect.data.Schema based on the avro schema? For a test I have manually created a schema using the Builder:

Schema matomoSchema = SchemaBuilder.struct()
                .name("MatomoRecord")
                .field("name", Schema.STRING_SCHEMA)
                .field("type", Schema.STRING_SCHEMA)
                .field("timestamp", Schema.INT64_SCHEMA)
                .build();

The result was:

org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class MatomoRecord

Could sombody help me to define the value schema based on the avro schema?

Best regards Martin

Upvotes: 2

Views: 1844

Answers (2)

Liam Clarke
Liam Clarke

Reputation: 385

A KC Schema is an JSON schema that looks awfully like an Avro schema. Try org.apache.kafka.connect.json.JsonConverter#asConnectSchema - you may need to massage the Avro schema to make it work.

Upvotes: 1

OneCricketeer
OneCricketeer

Reputation: 191738

You can't use record.getValue(), nor is there a direct API from Avro to Connect Schema (without internal methods of Confluent's AvroConverter)

You need to parse that object into a Struct object that matches the schema you've defined (which looks fine assuming none of your object fields can be null)

Look at the Javadoc for how you can define it https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html

Note (not relevant here), nested structs should be built from the "bottom up", where you put child structs / arrays into parent ones.

Your connector should not necessarily depend on Avro other than to include your model objects. The Converter interfaces are responsible for converting your Struct with its Schema into other data formats (JSON, Confluent's Avro encoding, Protobuf, etc)

Upvotes: 2

Related Questions