Reputation: 45
I have 1- a single CSV file and 2- a live KAFKA stream. KAFKA stream brings in live streaming logs and the CSV file contains metadata records that I need to join them with the streaming logs before sending them to Elastic Search.
Example of a Kafka stream log and a CSV record:
KAFKA log: MachineID: 2424, MachineType: 1, MessageType: 9
CSV record: MachineID: 2424, MachineOwner: JohnDuo
Record I need to build in logstash before sending to ES:
MachineID: 2424
MachineOwner: JohnDuo
MachineType: 1
MessageType: 9
I want a solution either a Ruby or Logstash plugin or anything else to read this CSV file once and bring them in and join them in the Logstash conf file. I need to keep the content of the CSV file in memory otherwise CSV look ups on each live Kafka log kills my Logstash performance.
Upvotes: 0
Views: 431
Reputation: 7463
Try the translate
filter.
You would need something like this.
filter {
translate {
dictionary_path => "/path/to/your/csv/file.csv"
field => "[MachineId]"
destination => "[MachineOwner]"
fallback => "not found"
}
}
Then you in your file.csv
you will have the following.
2424,JohnDuo
2425,AnotherUser
For every event that has the field MachineId
, this filter will look up for this id in the dictionary, if it finds a match, it will create a field named MachineOwner
with the value of the match, if it does not find a match, it will create the field MachineOwner
with the value not found
, if you do not want to create the field in the case of a no match, you can remove the fallback
option.
The dictionary is loaded in memory when logstash starts and it is reloaded every 300 seconds, you also can change that behaviour.
Upvotes: 1