hello_its_me
hello_its_me

Reputation: 793

How to have an input of type MongoDB for Logstash

I know we can input files, and output to a mongo database. But I have a collection in my mongodb that I would like to have as an input so that I can use it with ES. Is this possible? Thank you.

Upvotes: 8

Views: 6746

Answers (5)

jk464
jk464

Reputation: 31

Note as of version 4.8.2 of the Mongo JDBC plugin the jdbc_driver_class has changed, so the correct config for input is:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc4.8.2.jar"
    jdbc_driver_class => "Java::com.wisecoders.dbschema.mongodb.JdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
    jdbc_user => "devroot"
    jdbc_password => "devroot"
    schedule => "*/30 * * * * *"
    statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': new ISODate(:sql_last_value)}},{'_id': false});"
  }
}

Upvotes: 1

ostmond
ostmond

Reputation: 609

I tried out with Sergio Sánchez Sánche's solution suggestion and found following updates and improvements:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc3.0.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
    jdbc_user => "devroot"
    jdbc_password => "devroot"
    schedule => "*/30 * * * * *"
    statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': new ISODate(:sql_last_value)}},{'_id': false});"
  }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    action => "update"
    doc_as_upsert => true
    document_id => "%{[document][uuid]}"
    index => "processed_files"
    hosts => ["elasticsearch:9200"]
    user => "elastic"
    password => "password"
    ssl => true
    ssl_certificate_verification => false
    cacert => "/etc/logstash/keys/certificate.pem"
  }
}

Explanation:

  • The date comparison in Mongodb has to use new ISODate to convert :sql_last_value

  • I'd like to use "update" instead of "create" to cover the case of update. The query result from the section input is contained in "document". Assume you have a field with unique value "uuid", you have to use it to identify the document, because Mongodb's "_id" is not supported anyway.

  • If you have any embedded document which has also "_id" filed, you have to exclude it, too, e.g.

    statement => "db.profiles.find({'updatedAt' : {'$gte': new ISODate(:sql_last_value)}}, {'_id': false, 'embedded_doc._id': false}});"

Upvotes: 3

I have had a similar problem, the logstash-input-mongodb plugin is fine, but it is very limited, it also seems that it is no longer being maintained, so, I have opted for the logstash-integration-jdbc plugin.

I have followed the following steps to sync a MongoDB collection with ES:

First, I have downloaded the JDBC driver for MongoDB developed by DBSchema that you can find here.

I have prepared a custom Dockerfile to integrate the driver and plugins as you can see below:

FROM docker.elastic.co/logstash/logstash:7.9.2

RUN mkdir /usr/share/logstash/drivers
COPY ./drivers/* /usr/share/logstash/drivers/

RUN logstash-plugin install logstash-integration-jdbc
RUN logstash-plugin install logstash-output-elasticsearch

I have configured a query that will be executed every 30 seconds and will look for documents with an insert timestamp later than the timestamp of the last query (provided with the parameter :sql_last_value)

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc2.3.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
    jdbc_user => "devroot"
    jdbc_password => "devroot"
    schedule => "*/30 * * * * *"
    statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': :sql_last_value}},{'_id': false});"
  }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    action => "create"
    index => "processed_files"
    hosts => ["elasticsearch:9200"]
    user => "elastic"
    password => "password"
    ssl => true
    ssl_certificate_verification => false
    cacert => "/etc/logstash/keys/certificate.pem"
  }
}

Hope it can help someone, regards

Upvotes: 10

hello_its_me
hello_its_me

Reputation: 793

So apparently, the short answer is No, it is not possible to have an input from a database in Logstash.

EDIT

@elssar thank you for your answer:

Actually, there is a 3rd party mongodb input for logstash - github.com/phutchins/logstash-input-mongodb – elssar

Upvotes: 1

Olly Cruickshank
Olly Cruickshank

Reputation: 6180

You could set up a river to pull data from MongoDB to Elasticsearch.

See the instructions here - http://www.codetweet.com/ubuntu-2/configuring-elasticsearch-mongodb/

Upvotes: 3

Related Questions