Reputation: 47
Filebeat.yml file:
filebeat.inputs:
- type: log
paths:
- C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
exclude_lines: ['^Infobase.+']
output.logstash:
hosts: ["localhost:5044"]
worker: 1
Filebeat collects logs from such a folder structure:
C:\Program Files\Filebeat\test_logs\*\*\*\*.txt
There are many folders here, and each has at least several logs at the end.
Example of log files (In several log files, the time may be the same, since the logs are from different users):
"03.08.2020 10:56:38","Event LClick","Type Menu","t=0","beg"
"03.08.2020 10:56:38","Event LClick","Type Menu","Detail SomeDetail","t=109","end"
"03.08.2020 10:56:40","Event LClick","t=1981","beg"
"03.08.2020 10:56:40","Event LClick","t=2090","end"
"03.08.2020 10:56:41","Event LClick","Type ToolBar","t=3026","beg"
"03.08.2020 10:56:43","Event LClick","Type ToolBar","Detail User_Desktop","t=4477","end"
"03.08.2020 10:56:44","Event FormActivate","Name Form_Name:IsaA","t=5444"
"03.08.2020 10:56:51","Event LClick","t=12543","beg"
"03.08.2020 10:56:51","Event LClick","t=12605","end"
"03.08.2020 10:56:52","Event LClick","Form ","Type Label","Name Application.for.training","t=13853","beg"
"03.08.2020 10:57:54","Event LClick","Form Application.for.training","Type Label","Name Application.for.training","t=75442","end"
"03.08.2020 10:57:54","Event FormActivate","Name List.form","t=75785"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85769","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85847","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85879","end"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","beg"
"03.08.2020 10:58:04","Event Wheel","Form List.form","Type FormTable","Name Список","t=85925","end"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","t=89373","beg"
"03.08.2020 10:58:08","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=89451","end"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","t=96580","beg"
"03.08.2020 10:58:15","Event LClick","Form List.form","Type FormTable","Name Список","Detail Data","t=96643","end"
Logstash confing file:
input {
beats {
port => '5044'
}
}
filter {
grok {
patterns_dir => ['./patterns']
match => { 'message' => '%{TIME:timestamp}(","Event\s)(?<Event>([^"]+))(","Form\s)?(?<Form>([^"]+))?(","ParentType\s)?(?<parent_type>([^"]+))?(","ParentName\s)?(?<parent_name>([^"]+))?(","Type\s)?(?<type>([^"]+))?(","Name\s)?(?<Name_of_form>([^"]+))?(","Detail\s)?(?<Detail>([^"]+))?(","t=)?(?<t>([\d]+))?(",")?(?<Status>(end|beg))?' }
add_tag => [ '%{Status}' ]
}
dissect {
mapping => {
'[log][file][path]' => 'C:\Program Files\Filebeat\test_logs\%{somethingtoo}\%{something}\%{User_Name}\%{filename}.txt'
}
}
date {
match => [ 'timestamp', 'dd.MM.yyyy HH:mm:ss' ]
}
elapsed {
unique_id_field => 'Event'
start_tag => 'beg'
end_tag => 'end'
new_event_on_match => false
}
if 'elapsed' in [tags] {
aggregate {
task_id => '%{Event}'
code => 'map["duration"] = [(event.get("elapsed_time")*1000).to_i]'
map_action => 'create'
}
}
mutate {
remove_field => ['timestamp', 'ecs', 'log', 'tags', 'message', '@version', 'something', 'somethingtoo', 'filename', 'input', 'host', 'agent', 't', 'parent_type', 'parent_name', 'type']
rename => {'elapsed_time' => 'Event_duration'}
}
}
output {
elasticsearch {
hosts => ['localhost:9200']
index => 'test'
}
}
In my logstash.conf I am using the aggregate filter and set worker 1 (-w 1) to work properly.
When I was doing tests and configuring using only one log file I set -w 1 and everything worked fine. But when I started collecting all the logs from each directory, problems started. The data is not put into elasticsearch correctly (this is clearly seen from the strange numbers based on the results of aggregation)
I tried setting this up in filebeat.yml in logstash output (worker: 1), but it still didn't help.
Questions:
Upvotes: 0
Views: 396
Reputation: 7463
You are using elapsed
and aggregate
with a field that is not unique, you can have the same value for the Event
field in different files, which can make the elapsed
filter uses the start event from one file and the end event from another.
This happens because filebeat harvester files in parallel and send it in bulk to logstash. The worker
option in the config has no use in your case, it is related to the number of worker to ship the data, not collect.
You can try to use the option harvester_limit: 1
, to limit the number of parallel harvesters, but this can slow your data processing and there is no guarantee that it won't mix up your filters. Also, Filebeat does not guarantee the order of the events, just at-least-once delivery.
The best solution is to create a unique field concatenating the Event
field with the filename
field, this way the events from different files won't be mixed up.
You can do that using adding the mutate
filter before your elapsed
filter.
mutate {
add_field => { "uniqueEvent" => "%{Event}_%{filename}" }
}
This will create a field named uniqueEvent
with a value like Lclick_filename
, you will then use this new field in your elapsed
and aggregate
filters.
If you have the same file name in different folders you will need to use another field from your path until you make the value of uniqueEvent
a unique value.
Upvotes: 1