Reputation: 27
Is there any kafka connector that can handle this kind of requests please ?
I receive datas in a kafka topic in this format (Number of rows inside the JSON are random):
{
"1574922337":[{"price": 1, "product": 2],
"1574922338":[{"price": 13, "product": 5}],
"1574922339":[{"price": 0.2, "product": 1}]
}
I want from the kafka connector to parse this JSON event message received from the topic in order to create 3 documents at a time with id=TIMESTAMP
, and this using UPSERT
(If id
already exists, we update it only, if not exists it will be added).
Do you have an approach for this please or references ?
Or any open source connector that can be easy to compile using MAVEN, will make some changes on it to adapt it for this kind of request.
Waiting your help please.
Upvotes: 1
Views: 642
Reputation: 217344
Ok, here goes. As I suggested, it should work pretty well by leveraging Logstash with:
kafka
input to read messagesruby
filter to slice and dice the message into several eventselasticsearch
output to perform upsertsThe main Logstash configuration follows:
input {
kafka {
bootstrap_servers => "..."
topic_id => "message_topic"
auto_offset_reset => "smallest"
reset_beginning => true
}
}
filter {
mutate {
remove_field => ["@timestamp", "@version", "host"]
}
ruby {
path => "/path/to/split.rb"
}
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output {
elasticsearch {
hosts => "https://..."
http_compression => true
index => "product_index"
document_id => "%{id}"
document_type => "type_name"
action => "update"
doc_as_upsert => true
}
}
The ruby code in split.rb
is pretty simple. What is does is iterate over each timestamp and for each element of the array that timestamp points to, it creates a new event with the timestamp as the id
field.
def register(params)
end
def filter(event)
events = []
event.to_hash.each do |timestamp,array|
array.each do |sub|
subEvent = LogStash::Event.new(sub)
subEvent.set('id', timestamp)
events << subEvent
end
end
return events
end
Basically, what it produces for the sample message you gave above is the following:
{"id":"1574922337","product":2,"price":1}
{"id":"1574922339","product":1,"price":0.2}
{"id":"1574922338","product":5,"price":13}
Upvotes: 2