Verv
Verv

Reputation: 2473

Filtering logstash input

I have the following logstash pipeline, which basically fetches records from an Oracle database, using the jdbc input plugin from logstash, and outputs them into an elasticsearch index.

input {
    jdbc {
        id => "nouvelle-demande"

        jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
        jdbc_user => "${JDBC_USER}"
        jdbc_password => "${JDBC_PASSWORD}"
        jdbc_driver_library => "${JDBC_DRIVER_LIBRARY}"
        jdbc_driver_class => "${JDBC_DRIVER_CLASS}"
        jdbc_validate_connection => true
        jdbc_fetch_size => 512
        clean_run => "${CLEAN_RUN}"

        schedule => "${NOUVELLE_DEMANDE_SCHEDULE}"
        statement_filepath => "/usr/share/logstash/query/nouvelle-demande.sql"
        use_column_value => true
        tracking_column => "id"
        last_run_metadata_path => "/usr/share/logstash/nouvelle-demande-logstash_jdbc_last_run"
        tags => "nouvelle-demande"

    }
}

output {
    if "nouvelle-demande" in [tags] { 
        elasticsearch {
            id => "nouvelle-demande-output"
            hosts => "elasticsearch:9200"
            index => "nouvelle-demande"
            document_type => "demande"
            document_id => "%{id}"
        }
    }
}

The elasticsearch index is then used as a treatment queue by a Java application. Once an index has been treated, it's deleted from the index by the Java application, and added to another elasticsearch index, which basically acts as a log of treated indexes. This log index is never deleted, it's the treatment history.

What I would like to do is for logstash to ignore the indexes that are already present in my history index. Basically my processing loop in the Java application does check if the index is already in the history index and will skip it if it is, but my queue still contains all these indexes that were already treated, which bloats the processing since the queue just keeps getting bigger everytime my logstash schedule runs.

I've had a look at logstash's elasticsearch filter plugin, along with the other available filter plugins, but none of them seem to be able to achieve what I'm trying to do. Also did a lot of Googling but can't find anything that fits my need, I guess it's unconventional.

The data model isn't exactly the same between the two indexes, but the id will be the same in both indexes.

Note: yes I could do the filtering in the Java application itself, but I'd like to have this filtering handled by logstash automatically and handling it from the application itself would be my last resort solution.

Thanks.

Upvotes: 0

Views: 93

Answers (1)

TheFiddlerWins
TheFiddlerWins

Reputation: 922

You can use the logstash ruby filter to do something like this. The problem is that it will have to query every time you receive a record which is potentially a massive load.

https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html

Upvotes: 1

Related Questions