Reputation: 380
I have a dump of the IMDB database in form of a CSV. The CSV look like this :
name, movie, role
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
"'El Burro' Van Rankin, Jorge","Serafín (1999)",PLAYED_IN
.........
"A.S., Alwi","Rumah masa depan (1984)",PLAYED_IN
"A.S., Giri","Sumangali (1940)",PLAYED_IN
"A.S., Luis","Bob the Drag Queen: Bloodbath (2016)",PLAYED_IN
"A.S., Pragathi","Suli (2016)",PLAYED_IN
"A.S.F. Dancers, The","D' Lucky Ones! (2006)",PLAYED_IN
.........
My goal is to put the data in Elastic Search but I don't want to have duplicate of actors so I want to aggregate the movie they are playing in so that the dataset look like this :
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBm",
"_score": 13.028783,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Naomi and Ely's No Kiss List (2015)",
"Staten Island Summer (2015/II)",
"What Happened Last Night (2016)",
...
],
"@version": "1",
"name": "Abernethy, Kevin",
}
}
So I am using Logstash to push the data into ElasticSearch. I use the aggregate plugin and my configuration file is the following :
input {
file {
path => "/home/maeln/imdb-data/roles.csv"
start_position => "beginning"
}
}
filter {
csv {
columns => [ "name", "movie" ]
remove_field => ["role", "message", "host", "column3", "path"]
separator => ","
}
aggregate {
task_id => "%{name}"
code => "
map['movie'] ||= []
event.to_hash.each do |key,value|
map[key] = value unless map.has_key?(key)
map[key] << value if map[key].is_a?(Array)
end
"
push_previous_map_as_event => true
timeout => 30
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "imdb13"
}
}
But then, when I do a simple search on the index, all the actors are duplicated with only one movie in the "movie" field, like this :
{
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 149,
"max_score": 13.028783,
"hits": [
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBm",
"_score": 13.028783,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Naomi and Ely's No Kiss List (2015)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBq",
"_score": 12.998644,
"_source": {
"@timestamp": "2017-01-18T09:42:15.149Z",
"movie": [
"Staten Island Summer (2015/II)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
{
"_index": "imdb13",
"_type": "logs",
"_id": "AVmw9JHCrsOFTsZwAmBu",
"_score": 12.998644,
"_source": {
"@timestamp": "2017-01-18T09:42:15.150Z",
"movie": [
"What Happened Last Night (2016)"
],
"@version": "1",
"name": "Abernethy, Kevin",
"tags": [
"aggregated"
]
}
},
.....
Is there a way to fix this ?
The log from logstash with the --debug option (only partially, the whole log is around ~1Gio) : paste (I put it on pastebin because of the 30000 chars limit in stackoverflow).
The last lines of the log :
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"role"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.900Z %{host} %{message}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"message"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"host"}
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.915Z %{host} %{message}}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"column3"}
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Christopher'
[2017-01-18T11:34:09,977][DEBUG][logstash.filters.csv ] filters/LogStash::Filters::CSV: removing field {:field=>"path"}
[2017-01-18T11:34:09,977][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,977][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["Tabi tabi po! (2001)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.csv ] Event after csv filter {:event=>2017-01-18T10:34:09.921Z %{host} %{message}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.911Z, "movie"=>["21 Jump Street (1987)"], "@version"=>"1", "name"=>"Ansley, Zachary", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anseth, Elias Moussaoui'
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Death Match: Fighting Fist of Samurai Joe (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.917Z, "movie"=>["The Diplomat Hotel (2013)"], "@version"=>"1", "name"=>"Anson, Alvin", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate create_timeout_event call with task_id 'Anson, Alvin'
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] output received {"event"=>{"@timestamp"=>2017-01-18T10:34:09.897Z, "movie"=>["Tayong dalawa (2009)"], "@version"=>"1", "name"=>"Anselmuccio, Alex", "tags"=>["aggregated"]}}
[2017-01-18T11:34:09,978][DEBUG][logstash.pipeline ] filter received {"event"=>{"path"=>"/home/maeln/Projets/oracle-of-bacon/imdb-data/roles.csv", "@timestamp"=>2017-01-18T10:34:09.900Z, "@version"=>"1", "host"=>"maeln-GE70-2PE", "message"=>"\"Ansfelt, Jacob\",\"Manden med de gyldne ører (2009)\",PLAYED_IN"}}
[2017-01-18T11:34:09,978][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Aggregate: adding tag {"tag"=>"aggregated"}
[2017-01-18T11:34:09,978][DEBUG][logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\n\t\t\t\tmap['movie'] ||= []\n\t\t\t\t\tevent.to_hash.each do |key,value|\n\t\t\t\t\tmap[key] = value unless map.has_key?(key)\n\t\t\t\t\tmap[key] << value if map[key].is_a?(Array)\n\t\t\t\tend\n\t\t\t\t"}
Pastebin with only the line containing logstash.filters.aggregate : link
Upvotes: 0
Views: 416
Reputation: 217554
The issue you're facing relates to the fact that once a line is read it is handed out to a filter+output thread.
If you have several CPUs, several of those threads will be processing your lines in parallel and hence the output order is not guaranteed anymore. More importantly, each of your aggregate
filters will be local to a given thread so it's definitely possible that several lines relating to the same actor (even if in order) get processed by different threads in parallel and the output order might differ.
Once solution would be to run logstash with the option -w 1
to only create a single worker thread, but you'll decrease the throughput by doing so.
Upvotes: 1