Reputation: 319
I'm trying to read a csv file and send it as a json to a kafka topic. I started from the connector example at the site CSV Source Connector for Confluent Platform and I'm using a confluent local installation. Using a basic example it works fine, my csv sample data is:
name,street,city
Homer Simpson,742 Evergreen Terrace,Springfield
And I was able to read a json from the topic like this:
{
"name": "Homer Simpson",
"street": "742 Evergreen Terrace",
"number": "Springfield"
}
Now, what I need to do is to translate this csv line to this json:
{
"name": "Homer Simpson",
"address": {
"street": "742 Evergreen Terrace",
"number": "Springfield"
}
}
Here is the data source connector I'm using:
{
"name": "NestedExample",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"tasks.max": 1,
"connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "nested-topic",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": "/home/project/opt/confluent-6.2.0/sftp/data",
"error.path": "/home/project/opt/confluent-6.2.0/sftp/error",
"finished.path": "/home/project/opt/confluent-6.2.0/sftp/finished",
"input.file.pattern": ".*.csv",
"sftp.username": "user",
"sftp.password": "password",
"sftp.host": "10.254.1.6",
"sftp.port": "22",
"csv.ignore.leading.whitespace": "true",
"csv.first.row.as.header": "false",
"csv.skip.lines": 1,
"key.schema": "{\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : true,\"fieldSchemas\" : {\"material\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}",
"value.schema": "{ \"name\" : \"com.example.users.User\", \"type\" : \"STRUCT\", \"isOptional\" : false, \"fieldSchemas\" : { \"name\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"street\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"number\" : { \"isOptional\" : false, \"type\" : \"STRING\" } } }"
}
}
And here is my "value.schema" formatted for readability:
{
"name" : "com.example.users.User",
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"name" : {
"isOptional" : false,
"type" : "STRING"
},
"street" : {
"isOptional" : false,
"type" : "STRING"
},
"number" : {
"isOptional" : false,
"type" : "STRING"
}
}
}
Upvotes: 0
Views: 628
Reputation: 191738
You would need to write a Single Message Transform in order to modify multiple fields like this, at once
Alternatively, use ksqlDB or Kafka Streams to map data into the format data into the output you expect, after using Connect
Upvotes: 1