Reputation: 231
I am reading checkpoint log file with csv format with logstash and some fields have null value.
i want to remove all fields with null value.
i can not foresee exactly which fields(keys) will have null value because i have 150 columns in the csv file and i dont want check each one of them.
is it possible to do a dynamic filter in logstash that will remove any fields with null value?
my logstash configuration file look like that:
input {
stdin { tags => "checkpoint" }
file {
type => "file-input"
path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv"
sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2"
start_position => "beginning"
tags => ["checkpoint","offline"]
}
}
filter {
if "checkpoint" in [tags] {
csv {
columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
# remove_field => [ any fields with null value] how to do it please
separator => "|"
}
# drop csv header
if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" {
drop { }
}
}
}
}
output {
stdout {
codec => rubydebug
}
file {
path => "output.txt"
}
HERE I ATTACH SOME LOGS EXAMPLE:
num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration
0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Upvotes: 8
Views: 18022
Reputation: 71
ruby {
init => "
def removeEmptyField(event,h,name)
h.each do |k,v|
if (v.is_a?(Hash) || v.is_a?(Array)) && v.to_s != '{}'
removeEmptyField(event,v,String.new(name.to_s) << '[' << k.to_s << ']')
else
if v == '' || v.to_s == '{}'
event.remove(String.new(name.to_s) << '[' << k.to_s << ']')
end
end
end
end
"
code => "
removeEmptyField event,event.to_hash,''
"
}
Upvotes: 1
Reputation: 469
If you need to remove all null, blank, and empty fields recursively (0
and false
remain), this function might be able to help. It uses the Ruby filter in Logstash. It's by no means elegant, but seems to work pretty effectively.
filter {
ruby {
init => "
def Compact(key)
modifiedKey = nil
parentKey = nil
if key.kind_of?(String)
if key.start_with?('[')
modifiedKey = key
else
modifiedKey = key.sub( /([^\[^\]]*)/, '[\1]')
end
parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
end
unless modifiedKey.nil?
if event.get(modifiedKey).is_a?(Enumerable) &&
(event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
event.remove(modifiedKey)
elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
event.remove(modifiedKey)
end
if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
(event.get(parentKey).nil? || event.get(parentKey).empty?)
event.remove(parentKey)
end
end
if key == event.to_hash ||
event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
key = event.get(modifiedKey) unless modifiedKey.nil?
key.each{ |k|
Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
}
end
rescue Exception => e
puts %{ruby_exception_#{__method__.to_s} - #{e}}
end
"
code => "
Compact(event.to_hash)
"
}
}
Upvotes: 1
Reputation: 483
Check the skip_empty_columns
option of the csv
filter - was really helpful in my use case. :)
Usage:
skip_empty_columns => true
Upvotes: 1
Reputation: 7890
Ruby
filter can meet your requirement.
input {
stdin {
}
}
filter {
csv {
columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
separator => "|"
}
ruby {
code => "
hash = event.to_hash
hash.each do |k,v|
if v == nil
event.remove(k)
end
end
"
}
}
output {
stdout { codec => rubydebug }
}
You can use ruby plugin to filter all the field with nil
value (null in Ruby)
Updated:
This is my environment: Windows server 2008 and Logstash 1.4.1. Your logs sample is work at me! I have updated the configuration, input and output.
Input
2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Output:
{
"@version" => "1",
"@timestamp" => "2015-03-12T00:30:34.123Z",
"host" => "BENLIM",
"num" => "2",
"date" => "8Jun2012",
"time" => "16:52:39",
"orig" => "10.0.0.1",
"type" => "log",
"action" => "keyinst",
"i/f_name" => "daemon",
"i/f_dir" => "inbound",
"product" => "VPN-1 & FireWall-1",
"Internal_CA:" => "Certificate initialized",
"serial_num:" => "86232",
"dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
}
Upvotes: 10
Reputation: 16362
To do things this dynamically, you'll want to use the ruby{} filter. There's some good sample code in this answer.
Upvotes: 0