Reputation: 142
I'm trying to handle nested arrays of struct via my Kafka Connect JDBC Sink Connector while writing records to Postgres.
Im using Kafka Connect (MSK Connect) build in AWS MSK. (I cannot use KSQL or Kafka Streams right now, or can i? run kafka streams on https://aws.amazon.com/blogs/big-data/power-your-kafka-streams-application-with-amazon-msk-and-aws-fargate/)
My schema looks like follows:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"optional": "false",
"field": "x",
"fields": [
{
"type": "string",
"optional": "true",
"field": "y"
},
{
"type": "string",
"optional": "true",
"field": "z"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "list-id"
},
{
"type": "string",
"optional": true,
"field": "list-no"
}
],
"optional": true
},
"optional": "true",
"field": "mylist"
}
]
}
],
"optional": "true",
"name": "Name"
},
"payload": {
"x": {
"y": "abc",
"z": "abc",
"mylist": [
{
"list-id": 5080866,
"list-no": "1"
},
{
"list-id": 3111,
"list-no": "5"
}
]
}
}
}
And my kafka connect config looks like:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://HOST:5432/database
topics=testTopic
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.user=user
connection.password=password
auto.create=true
auto.evolve=true
insert.mode=insert
batch.size=200
tasks.max=1
table.name.format=test
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
I've got an error:
Caused by: org.apache.kafka.connect.errors.DataException: Flatten transformation does not support ARRAY for record with schemas (for field Name.x.mylist)
Is there a way to deal with this using Kafka Connect?
Upvotes: 1
Views: 1304
Reputation: 191738
Should i create own custom SMT? Is this the only way?
Ultimately, yes, that is "one" way, but not very flexible, as SMT require a specific data format.
You could also use Kafka Streams (in ECS/Fargate, yes), Spark, Flink (in EMR), etc to "massage" your data before using Kafka Connect (or Spark/Flink can also write to JDBC databases).
Upvotes: 1