Reputation: 12871
I've written a filter and use its register-function to load an external CSV-file and fill a bunch of hash-tables. The filter-function then accesses the hash-tables and adds fields to the event. While that's working nicely, the downside is that it only loads once and I'd need to restart logstash to trigger the reload after a change in the CSV-file. Maybe I should add that the filter is currently consuming events coming from three different file inputs.
Writing an input doesn't seem to solve it as the input is not tied to the filter in some way. Therefore, my plan is to somehow reload the CSV-file every few hours or at a particular time and somehow block the entire filter during that, i.e. pause incoming events. That sounds like a weird thing to do and I'm not sure whether or not logstash is actually meant to be used like this.
I'm a newbie regarding Ruby and actually I'm quite amazed that the filter is working this nice. As Google let me down on the entire issue I'm hoping that anyone on here has experience with this, can post a link to an example or can point me to another way of solving this.
Upvotes: 0
Views: 498
Reputation: 12871
For educational purposes I looked into the source of logstash and noticed that I could actually understand what's going on and things are much less complicated than I had thought.
There is a function filterworker
in pipeline.rb and a class filterworker
and I don't know which one is actually used, but my findings seem to be true for both.
Basically all filters seem to run in one thread in case it's not configured otherwise. This means that I can reload the file anywhere in the filter-function and the entire processing for all filters is paused (input and output might still do something, but that's handled by the queue for the events holding maximum 20 entries).
Therefore, this seems to do it for me:
public
def register
@config_files_read_timestamps = {}
read_config_files
end # def register
def filter(event)
# return nothing unless there's an actual filter event
return unless filter?(event)
read_config_files
:
# filter_matched should go in the last line of our successful code
filter_matched(event)
end # def filter
private
def read_config_files
read_marker_file
:
end
def check_for_changed_file?(filename)
mtime = File.mtime(filename)
@config_files_read_timestamps[filename] ||= Time.at(0)
if @config_files_read_timestamps[filename] < mtime
@config_files_read_timestamps[filename] = mtime
return true
end
end
def read_marker_file
if !check_for_changed_file?("markers.txt")
return
end
:
end
Obviously I don't need a separate thread for the parsing. It would become necessary if I plan to start the reload at a specific time. In that case I'd have to join the thread and then continue with event handling.
Let me know if there could be improvements...
Upvotes: 1