Gurkeerat Sondhi
Gurkeerat Sondhi

Reputation: 59

Read a CSV in Logstash level and filter on basis of the extracted data

I am using Metricbeat to get process-level data and push it to Elastic Search using Logstash.

Now, the aim is to categorize the processes into 2 tags i.e the process running is either a browser or it is something else.

I am able to do that statically using this block of code :


input {
  beats {
    port => 5044
  }
}
filter{
    if [process][name]=="firefox.exe" or [process][name]=="chrome.exe" {
        mutate {
            add_field => { "process.type" => "browsers" }
            convert => {
            "process.type" => "string"
            }
        }
    }
    else {
        mutate {
            add_field => { "process.type" => "other" }
        } 
    }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    # manage_template => false
    index => "metricbeatlogstash"
  }
}

But when I try to make that if condition dynamic by reading the process list from a CSV, I am not getting any valid results in Kibana, nor a error on my LogStash level.

The CSV config file code is as follows :

input {
  beats {
    port => 5044
  }
  file{
        path=>"filePath"
        start_position=>"beginning"
        sincedb_path=>"NULL"
    }
}
filter{
    csv{
        separator=>","
        columns=>["processList","IT"]
    }
    if [process][name] in [processList] {
        mutate {
            add_field => { "process.type" => "browsers" }
            convert => {
            "process.type" => "string"
            }
        }
    }
    else {
        mutate {
            add_field => { "process.type" => "other" }
        } 
    }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    # manage_template => false
    index => "metricbeatlogstash2"
  }
}

Upvotes: 0

Views: 1294

Answers (1)

leandrojmp
leandrojmp

Reputation: 7473

What you are trying to do does not work that way in logstash, the events in a logstash pipeline are independent from each other.

The events received by your beats input have no knowledge about the events received by your csv input, so you can't use fields from different events in a conditional.

To do what you want you can use the translate filter with the following config.

translate {
    field => "[process][name]"
    destination => "[process][type]"
    dictionary_path => "process.csv"
    fallback => "others"
    refresh_interval => 300
}

This filter will check the value of the field [process][name] against a dictionary, loaded into memory from the file process.csv, the dictionary is a .csv file with two columns, the first is the name of the browser process and the second is always browser.

chrome.exe,browser
firefox.exe,browser

If the filter got a match, it will populate the field [process][type] (not process.type) with the value from the second column, in this case, always browser, if there is no match, it will populate the field [process][type] with the value of the fallback config, in this case, others, it will also reload the content of the process.csv file every 300 seconds (5 minutes)

Upvotes: 1

Related Questions