sujeesh valath
sujeesh valath

Reputation: 43

Is complex or nested json/schema supported by confluent Kafka Connect

Only able to insert simple objects into db using Confluent Kafka Connect. Not sure how to make this support complex json/schema structure. I am not sure whether this feature is avalable or not. There is a similar question here asked about a year ago, but not answered till now. Please help.

Upvotes: 4

Views: 4473

Answers (1)

Randall Hauch
Randall Hauch

Reputation: 7187

Kafka Connect does support complex structures, including Struct, Map, and Array. Generally only source connectors need to do this, as sink connectors are handed the values and simply need to use them. This documentation describes the basics of building up a Schema object that describes a Struct, and then creating a Struct instance that adheres to that schema. In this case, the example struct is just a flat structure.

However, you can easily add fields of type Struct that are defined with another Schema instance. In effect, it's just layering this simple pattern into multiple levels in your structs:

Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
    .field("number", Schema.INT16_SCHEMA)
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT8_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .field("address", addressSchema)
    .build();

Struct addressStruct = new Struct(addressSchema)
    .put("number", 100)
    .put("street", "Main Street")
    .put("city", "Springfield")
    .build();
Struct personStruct = new Struct(personSchema)
    .put("name", "Barbara Liskov")
    .put("age", 75)
    .put("address", addressStruct)
    .build();

Because the SchemaBuilder is a fluent API, you can actually embed it just like the custom admin boolean schema builder. But that's a bit harder since you need to reference the Schema to create the addressStruct.

Generally you only have to worry about how to do this when writing a source connector. If you're trying to use an existing source connector, you likely have very little control over the structure of the keys and values. For example, Confluent's JDBC source connector is modeling each table with a separate Schema and each row in that table as a separate Struct that uses that schema. But since rows are flat, the Schema and Struct will only contain fields with primitive types.

Debezium's CDC connectors for MySQL and PostgreSQL also model a relational table with a Schema and correspond Struct objects for each row, but CDC captures more information about the row such as the state of the row before and/or after the change. Consequently, these connectors use a more complex Schema for each table that involve nested Struct objects.

Note that while each source connector will have their own flavor of message structures, Kafka Connect's Single Message Transforms (SMTs) to make it pretty easy to filter, rename, and make slight modifications to the messages produced by a source connector before they are written to Kafka, or to the messages read from Kafka before they are sent to a sink connector.

Upvotes: 4

Related Questions