repcak
repcak

Reputation: 142

How to handle nested arrays of struct in Kafka JDBC Sink Connector

I'm trying to handle nested arrays of struct via my Kafka Connect JDBC Sink Connector while writing records to Postgres.

Im using Kafka Connect (MSK Connect) build in AWS MSK. (I cannot use KSQL or Kafka Streams right now, or can i? run kafka streams on https://aws.amazon.com/blogs/big-data/power-your-kafka-streams-application-with-amazon-msk-and-aws-fargate/)

My schema looks like follows:

{
"schema": {
    "type": "struct",
    "fields": [
        {
            "type": "struct",
            "optional": "false",
            "field": "x",
            "fields": [
                {
                    "type": "string",
                    "optional": "true",
                    "field": "y"
                },
                {
                    "type": "string",
                    "optional": "true",
                    "field": "z"
                },
                {
                    "type": "array",
                    "items": {
                        "type": "struct",
                        "fields": [
                            {
                                "type": "int32",
                                "optional": true,
                                "field": "list-id"
                            },
                            {
                                "type": "string",
                                "optional": true,
                                "field": "list-no"
                            }
                        ],
                        "optional": true
                    },
                    "optional": "true",
                    "field": "mylist"
                }
            ]
        }
    ],
    "optional": "true",
    "name": "Name"
},
"payload": {
    "x": {
        "y": "abc",
        "z": "abc",
        "mylist": [
            {
                "list-id": 5080866,
                "list-no": "1"
            },
            {
                "list-id": 3111,
                "list-no": "5"
            }
        ]
    }
}

}

And my kafka connect config looks like:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://HOST:5432/database
topics=testTopic
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.user=user
connection.password=password
auto.create=true
auto.evolve=true
insert.mode=insert
batch.size=200
tasks.max=1
table.name.format=test
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value

I've got an error:

Caused by: org.apache.kafka.connect.errors.DataException: Flatten transformation does not support ARRAY for record with schemas (for field Name.x.mylist)

Is there a way to deal with this using Kafka Connect?

Upvotes: 1

Views: 1304

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

Should i create own custom SMT? Is this the only way?

Ultimately, yes, that is "one" way, but not very flexible, as SMT require a specific data format.

You could also use Kafka Streams (in ECS/Fargate, yes), Spark, Flink (in EMR), etc to "massage" your data before using Kafka Connect (or Spark/Flink can also write to JDBC databases).

Upvotes: 1

Related Questions