Picsou Gamer
Picsou Gamer

Reputation: 27

Looking for a kafka connector to upsert datas to elasticsearch

Is there any kafka connector that can handle this kind of requests please ?

I receive datas in a kafka topic in this format (Number of rows inside the JSON are random):

{
"1574922337":[{"price": 1, "product": 2], 
"1574922338":[{"price": 13, "product": 5}], 
"1574922339":[{"price": 0.2, "product": 1}]
}

I want from the kafka connector to parse this JSON event message received from the topic in order to create 3 documents at a time with id=TIMESTAMP, and this using UPSERT (If id already exists, we update it only, if not exists it will be added).

Do you have an approach for this please or references ?

Or any open source connector that can be easy to compile using MAVEN, will make some changes on it to adapt it for this kind of request.

Waiting your help please.

Upvotes: 1

Views: 642

Answers (1)

Val
Val

Reputation: 217344

Ok, here goes. As I suggested, it should work pretty well by leveraging Logstash with:

The main Logstash configuration follows:

input {
  kafka {
    bootstrap_servers => "..."
    topic_id => "message_topic"
    auto_offset_reset => "smallest"
    reset_beginning => true
  }
}
filter {
  mutate {
    remove_field => ["@timestamp", "@version", "host"]
  }
  ruby {
    path => "/path/to/split.rb"
  }
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}
output {
  elasticsearch {
    hosts => "https://..."
    http_compression => true
    index => "product_index"
    document_id => "%{id}"
    document_type => "type_name"
    action => "update"
    doc_as_upsert => true
  }
}

The ruby code in split.rb is pretty simple. What is does is iterate over each timestamp and for each element of the array that timestamp points to, it creates a new event with the timestamp as the id field.

def register(params)
end

def filter(event)
  events = []
  event.to_hash.each do |timestamp,array|
    array.each do |sub| 
      subEvent = LogStash::Event.new(sub)
      subEvent.set('id', timestamp)
      events << subEvent
    end
  end
  return events
end

Basically, what it produces for the sample message you gave above is the following:

{"id":"1574922337","product":2,"price":1}
{"id":"1574922339","product":1,"price":0.2}
{"id":"1574922338","product":5,"price":13}

Upvotes: 2

Related Questions