Unable to convert Kafka topic data into structured JSON with Confluent Elasticsearch sink connector

I'm building a data pipeline using Kafka. Data flow is as follows: capture data change in mongodb and have it sent to elasticsearch.

Since I'm still testing, Kafka-related systems are running on single server.

Everything is fine with above system. Kafka connector captures data changes (CDC) and successfully sends it to elasticsearch via sink connector. The problem is that I cannot convert string-type-messaged data into structured data type. For instance, let's consume topic-data after making some changes to mongodb.

    $ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

Then, I get following result.

    "after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      "source": {
        "version": {
          "string": "0.7.5"
        "name": "higee",
        "rs": "",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        "initsync": {
          "boolean": false
      "op": {
        "string": "u"
      "ts_ms": {
        "long": 1524214412159

Then, if I go to elasticsearch, I get following result.

        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          "op": "u",
          "ts_ms": 1524214412159

One that I want to achieve is something as follows

        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"

Some of the options I've been trying and still considering is as follows.

Any idea how I can jsonify data in elasticsearch? Following are sources I searched.

Thanks in advance.

Some attempts

1) I've changed my connect-mongo-source.properties file as follows to test transformation.

    $ cat etc/kafka/connect-mongo-source.properties
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

And following is error log I got. Not yet being comfortable with Kafka and more importantly debezium platform, I wasn't able to debug this error.

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2) In this time, I've changed elasticsearch.properties and didn't make a change to connect-mongo-source.properties.

$ cat connect-mongo-source.properties


$ cat elasticsearch.properties

    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

And I got following error.

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3) changed test.avsc and ran logstash. I didn't get any error message but the outcome wasn't something I was expecting in that origin, salary, name fields were all empty even though they were given non-null values. I was even able to read data through console-consumer properly.

$ cat test.avsc
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
      } ]

$ cat logstash3.conf
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"

    output {
      stdout {
       codec => rubydebug

$ bin/logstash -f logstash3.conf
    "@version" => "1",
    "_id" => {
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    "@timestamp" => 2018-04-25T09:39:07.962Z

Robin Moffatt
Robin Moffatt

Reputation: 32090

+1 to @cricket_007's suggestion - use the io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope single message transformation. You can read more about SMTs and their benefit's here.

Upvotes: 1


Reputation: 377

I was able to solve this issue using python kafka client. Following is new architecture of my pipeline.

I used python 2 even though Confluent document says that python3 is supported. Main reason was that there were some python2-syntax code. For instance...(Not exactly following line but similar syntax)

    except NameError, err:

In order to use with Python3 I need to convert above lines into:

    except NameError as err:

That being said, following is my python code. Note that this code is only for prototyping and not for production yet.

Consume a message via Confluent Consumer

  • code

    from confluent_kafka.avro import AvroConsumer
    c = AvroConsumer({ 
           'bootstrap.servers': '',
           'group.id': 'groupid',
           'schema.registry.url': ''
    x = True
    while x:
        msg = c.poll(100)
        if msg:
            message = msg.value()
            x = False
  • (after updating a document in mongodb) let's check message variable

    {u'after': None,
     u'op': u'u',
     u'patch': u'{
         "_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
         "name" : "higee",
         "salary" : 100,
         "origin" : "S Korea"}',
     u'source': {
         u'h': 5734791721791032689L,
         u'initsync': False,
         u'name': u'higee',
         u'ns': u'higee.higee',
         u'ord': 1,
         u'rs': u'',
         u'sec': 1524362971,
         u'version': u'0.7.5'},
     u'ts_ms': 1524362971148

manipulate message consumed

  • code

    patch = message['patch']
    patch_dict = eval(patch)
  • check patch_dict

    {'name': 'higee', 'origin': 'S Korea', 'salary': 100}

Produce a message via Confluent Producer

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer

    value_schema_str = """
       "namespace": "higee.higee",
       "name": "MongoEvent",
       "type": "record",
       "fields" : [
               "name" : "name",
               "type" : "string"
              "name" : "origin",
              "type" : "string"
               "name" : "salary",
               "type" : "int32"
    AvroProducerConf = {
        'bootstrap.servers': '',
        'schema.registry.url': ''

    value_schema = avro.load('./user.avsc')
    avroProducer = AvroProducer(

    avroProducer.produce(topic='python', value=patch_dict)

The only thing left is to make elasticsearch sink connector respond to new topic 'python' by setting configuration in following format. Everything remains the same except topics.

    connector.class= io.confluent.connect \ 

Then run the elasticsearch sink connector and have it checked at elasticsearch.

        "_index": "zzzz",
        "_type": "kafka-connect",
        "_id": "zzzz+0+3",
        "_score": 1,
        "_source": {
          "name": "higee",
          "origin": "S Korea",
          "salary": 100

Upvotes: 2


Reputation: 191738

Python Client

You must use the Avro Consumer, otherwise you will get 'utf-8' codec can't decode byte

Even this example will not work because you still need the schema registry to lookup the schema.

The prerequisites of Confluent's Python Client says it works with Python 3.x

Nothing is stopping you from using a different client, so not sure why you left it at only trying Python.

Logstash Avro Codec

  1. JSON Codec cannot decode Avro data. I don't think the json filter following the avro input codec will work either
  2. Your Avro schema is wrong - You're missing the $oid in place of _id
  3. There is a difference between "raw Avro" (that includes the schema within the message itself), and Confluent's encoded version of it (which only contains the schema ID in the registry). Meaning, Logstash doesn't integrate with the Schema Registry... at least not without a plugin.

Your AVSC should actually look like this

  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
  } ]

However, Avro doesn't allow for names starting with anything but a regex of [A-Za-z_], so that $oid would be a problem.

While I would not recommend it (nor have actually tried it), one possible way to get your JSON-encoded Avro data into Logstash from the Avro console consumer could be use the Pipe input plugin

input {
  pipe {
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 


note that the after value is always a string, and that by convention it will contain a JSON representation of the document


I think this also applies to patch values, but I don't know Debezium, really.

Kafka won't parse the JSON in-flight without the use of a Simple Message Transform (SMT). Reading the documentation you linked to, you should probably add these to your Connect Source properties


Also worth pointing out, field flattening is on the roadmap - DBZ-561

Kafka Connect Elasticsearch

Elasticsearch doesn't parse and process encoded JSON string objects without the use of something like Logstash or its JSON Processor. Rather, it only indexes them as a whole string body.

If I recall correctly, Connect will only apply an Elasticsearch mapping to top-level Avro fields, not nested ones.

In other words, the mapping that is generated follows this pattern,

"patch": {
    "string": "...some JSON object string here..."

Where you actually need to be like this - perhaps manually defining your ES index

"patch": {
   "properties": {
      "_id": {
        "properties" {
          "$oid" :  { "type": "text" }, 
          "name" :  { "type": "text" },
          "salary":  { "type": "int"  }, 
          "origin": { "type": "text" }

Again, not sure if the dollar sign is allowed, though.

Kafka Connect MongoDB Source

If none of the above are working, you could attempt a different connector

Upvotes: 2

