Alberto Pires
Alberto Pires

Reputation: 319

Kafka/Confluent CSV/SFTP connector and nested json

I'm trying to read a csv file and send it as a json to a kafka topic. I started from the connector example at the site CSV Source Connector for Confluent Platform and I'm using a confluent local installation. Using a basic example it works fine, my csv sample data is:

name,street,city
Homer Simpson,742 Evergreen Terrace,Springfield

And I was able to read a json from the topic like this:

{
  "name": "Homer Simpson",
  "street": "742 Evergreen Terrace",
  "number": "Springfield"
}

Now, what I need to do is to translate this csv line to this json:

  {
    "name": "Homer Simpson",
    "address": {
      "street": "742 Evergreen Terrace",
      "number": "Springfield"
    }
  }

Here is the data source connector I'm using:

{
    "name": "NestedExample",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "tasks.max": 1,
        "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
        "kafka.topic": "nested-topic",
        "cleanup.policy": "NONE",
        "behavior.on.error": "IGNORE",
        "input.path": "/home/project/opt/confluent-6.2.0/sftp/data",
        "error.path": "/home/project/opt/confluent-6.2.0/sftp/error",
        "finished.path": "/home/project/opt/confluent-6.2.0/sftp/finished",
        "input.file.pattern": ".*.csv",
        "sftp.username": "user",
        "sftp.password": "password",
        "sftp.host": "10.254.1.6",
        "sftp.port": "22",
        "csv.ignore.leading.whitespace": "true",
        "csv.first.row.as.header": "false",
        "csv.skip.lines": 1,
        "key.schema": "{\"name\" : \"com.example.users.UserKey\",\"type\" : \"STRUCT\",\"isOptional\" : true,\"fieldSchemas\" : {\"material\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}",
        "value.schema": "{ \"name\" : \"com.example.users.User\", \"type\" : \"STRUCT\", \"isOptional\" : false, \"fieldSchemas\" : { \"name\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"street\" : { \"isOptional\" : false, \"type\" : \"STRING\" }, \"number\" : { \"isOptional\" : false, \"type\" : \"STRING\" } } }"
    }
}

And here is my "value.schema" formatted for readability:

{
  "name" : "com.example.users.User",
  "type" : "STRUCT",
  "isOptional" : false,
  "fieldSchemas" : {
    "name" : {
      "isOptional" : false,
      "type" : "STRING"
    },
    "street" : {
      "isOptional" : false,
      "type" : "STRING"
    },
    "number" : {
      "isOptional" : false,
      "type" : "STRING"
    }
  }
}

Upvotes: 0

Views: 628

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

You would need to write a Single Message Transform in order to modify multiple fields like this, at once

Alternatively, use ksqlDB or Kafka Streams to map data into the format data into the output you expect, after using Connect

Upvotes: 1

Related Questions