Reputation: 1944
I am trying to ingest data from Oracle database using Kafka Connect. The default object that Kafka connectors provide is a "GenericRecord" type.This makes it way too specific and leads to a situation where getting the data by performing a record.getAsString("someIDENTIFIER"). Is there a possibility of getting a particular type of object instead of GenericRecord type.
Upvotes: 0
Views: 762
Reputation: 7187
Kafka Connect source connectors work with SourceRecord
objects, and the Kafka Connect worker is configured to use a converter that serializes the SourceRecord
into a binary form that is then written to the Kafka topic. Kafka Connect ships with a JSON converter, and Confluent provides an Avro converter. So, the binary form of the message written to Kafka depends on which converter you're using.
(Likewise, sink connectors work with SinkRecord
objects, and the Kafka Connect worker uses its converter to deserialize the binary form of the message read from Kafka into a SinkRecord
object that the connector deals with.)
It sounds like you're writing a Kafka consumer and are seeing the GenericRecord
objects there. If so, then you've likely configured the Kafka Connect worker to use Confluent's Avro converter, which for source connectors like the JDBC connector convert the SourceRecord
into an Avro binary format that Kafka Connect then writes to the Kafka topic. Your client is then likely using a Kafka consumer configured with an Avro deserializer, and unless you give the deserializer an Avro schema to work with it will deserialize the Avro encoded message into an Avro GenericRecord
.
However, you can configure your application to be aware of a specific version of the Avro schema, and have the build system generate the code for that version of the Avro schema to create the specific code that will deserialize the Avro-encoded message into an in-memory form described by the schema. In Java, this means you'd generate the class from the schema, and then use that generated class in your code to copy the GenericRecord
into an instance of your class. See this complete consumer example, and specifically this line for the conversion from the GenericRecord
. In that example, LogLine
is the class generated from the Avro schema:
GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message();
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent);
One significant benefit of Avro is that it directly supports schema evolution, and Confluent's Schema Registry takes advantage of that. So while the source connector might evolve the generated Avro schema for a table in response to the table's structure changing in the database, as long as the database schema is changed such that the Avro schemas will be backward and/or forward compatible, the Avro libraries your client application uses will auto-convert from the Avro schema of the messages into the Avro schema your application uses.
Of course, at some point you'd change your application to use the new Avro schema, but that doesn't have to be at the same time. In fact, if you configure the Schema Registry to be enforce schema versions are both forward and backward compatible, you can change your client application before or after the database is changed and the JDBC source connector starts using a new version of Avro schema.
Upvotes: 3