Peter Lyons
Peter Lyons

Reputation: 146074

ExtractField and Parse JSON in kafka-connect sink

I have a kafka-connect flow of mongodb->kafka connect->elasticsearch sending data end to end OK, but the payload document is JSON encoded. Here's my source mongodb document.

{
  "_id": "1541527535911",
  "enabled": true,
  "price": 15.99,
  "style": {
    "color": "blue"
  },
  "tags": [
    "shirt",
    "summer"
  ]
}

And here's my mongodb source connector configuration:

{
  "name": "redacted",
  "config": {
    "connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
    "databases": "redacted.redacted",
    "initial.import": "true",
    "topic.prefix": "redacted",
    "tasks.max": "8",
    "batch.size": "1",
    "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
    "key.serializer.schemas.enable": false,
    "value.serializer.schemas.enable": false,
    "compression.type": "none",
    "mongo.uri": "mongodb://redacted:27017/redacted",
    "analyze.schema": false,
    "schema.name": "__unused__",
    "transforms": "RenameTopic",
    "transforms.RenameTopic.type":
      "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.RenameTopic.regex": "redacted.redacted_Redacted",
    "transforms.RenameTopic.replacement": "redacted"
  }
}

Over in elasticsearch, it ends up looking like this:

{
  "_index" : "redacted",
  "_type" : "kafka-connect",
  "_id" : "{\"schema\":{\"type\":\"string\",\"optional\":true},\"payload\":\"1541527535911\"}",
  "_score" : 1.0,
  "_source" : {
    "ts" : 1541527536,
    "inc" : 2,
    "id" : "1541527535911",
    "database" : "redacted",
    "op" : "i",
    "object" : "{ \"_id\" : \"1541527535911\", \"price\" : 15.99,
      \"enabled\" : true, \"tags\" : [\"shirt\", \"summer\"],
      \"style\" : { \"color\" : \"blue\" } }"
  }
}

I'd like to do use 2 single message transforms:

  1. ExtractField to grab object, which is a string of JSON
  2. Something to parse that JSON into an object or just let the normal JSONConverter handle it, as long as it ends up as properly structured in elasticsearch.

I've attempted to do it with just ExtractField in my sink config, but I see this error logged by kafka

kafka-connect_1       | org.apache.kafka.connect.errors.ConnectException:
Bulk request failed: [{"type":"mapper_parsing_exception",
"reason":"failed to parse", 
"caused_by":{"type":"not_x_content_exception",
"reason":"Compressor detection can only be called on some xcontent bytes or
compressed xcontent bytes"}}]

Here's my elasticsearch sink connector configuration. In this version, I have things working but I had to code a custom ParseJson SMT. It's working well, but if there's a better way or a way to do this with some combination of built-in stuff (converters, SMTs, whatever works), I'd love to see that.

{
  "name": "redacted",
  "config": {
    "connector.class":
      "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "batch.size": 1,
    "connection.url": "http://redacted:9200",
    "key.converter.schemas.enable": true,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "schema.ignore": true,
    "tasks.max": "1",
    "topics": "redacted",
    "transforms": "ExtractFieldPayload,ExtractFieldObject,ParseJson,ReplaceId",
    "transforms.ExtractFieldPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractFieldPayload.field": "payload",
    "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractFieldObject.field": "object",
    "transforms.ParseJson.type": "reaction.kafka.connect.transforms.ParseJson",
    "transforms.ReplaceId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceId.renames": "_id:id",
    "type.name": "kafka-connect",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

Upvotes: 1

Views: 6230

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

I am not sure about your Mongo connector. I don't recognize the class or the configurations... Most people probably use Debezium Mongo connector

I would setup this way, though

"connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",

"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
"key.serializer.schemas.enable": false,
"value.serializer.schemas.enable": true,

The schemas.enable is important, that way the internal Connect data classes can know how to convert to/from other formats.

Then, in the Sink, you again need to use JSON DeSerializer (via the converter) so that it creates a full object rather than a plaintext string, as you see in Elasticsearch ({\"schema\":{\"type\":\"string\").

"connector.class":
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true

And if this doesn't work, then you might have to manually create your index mapping in Elasticsearch ahead of time so it knows how to actually parse the strings you are sending it

Upvotes: 1

Related Questions