Reputation: 43
Only able to insert simple objects into db using Confluent Kafka Connect. Not sure how to make this support complex json/schema structure. I am not sure whether this feature is avalable or not. There is a similar question here asked about a year ago, but not answered till now. Please help.
Upvotes: 4
Views: 4473
Reputation: 7187
Kafka Connect does support complex structures, including Struct
, Map
, and Array
. Generally only source connectors need to do this, as sink connectors are handed the values and simply need to use them. This documentation describes the basics of building up a Schema
object that describes a Struct
, and then creating a Struct
instance that adheres to that schema. In this case, the example struct is just a flat structure.
However, you can easily add fields of type Struct
that are defined with another Schema
instance. In effect, it's just layering this simple pattern into multiple levels in your structs:
Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
.field("number", Schema.INT16_SCHEMA)
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.field("address", addressSchema)
.build();
Struct addressStruct = new Struct(addressSchema)
.put("number", 100)
.put("street", "Main Street")
.put("city", "Springfield")
.build();
Struct personStruct = new Struct(personSchema)
.put("name", "Barbara Liskov")
.put("age", 75)
.put("address", addressStruct)
.build();
Because the SchemaBuilder
is a fluent API, you can actually embed it just like the custom admin
boolean schema builder. But that's a bit harder since you need to reference the Schema
to create the addressStruct
.
Generally you only have to worry about how to do this when writing a source connector. If you're trying to use an existing source connector, you likely have very little control over the structure of the keys and values. For example, Confluent's JDBC source connector is modeling each table with a separate Schema
and each row in that table as a separate Struct
that uses that schema. But since rows are flat, the Schema
and Struct
will only contain fields with primitive types.
Debezium's CDC connectors for MySQL and PostgreSQL also model a relational table with a Schema
and correspond Struct
objects for each row, but CDC captures more information about the row such as the state of the row before and/or after the change. Consequently, these connectors use a more complex Schema
for each table that involve nested Struct
objects.
Note that while each source connector will have their own flavor of message structures, Kafka Connect's Single Message Transforms (SMTs) to make it pretty easy to filter, rename, and make slight modifications to the messages produced by a source connector before they are written to Kafka, or to the messages read from Kafka before they are sent to a sink connector.
Upvotes: 4