user3738017
user3738017

Reputation: 47

Elasticsearch not getting all documents from logstash

Logstash version 8.6.0 Elasticcsearch version 7.17.8

The total number of documents that should be created in Elasticsearch is 28000

The steps I tried 10 times:

  1. Remove index in elasticsearch
  2. Start logstash
  3. Wait until it finished in the logs. [2023-03-31T10:21:56,427][INFO ][logstash.javapipeline ][index] Pipeline terminated {"pipeline.id"=>"index"}. No Errors or Warning in the logstash logs.
  4. Check kibana that there are 28000 docucments.
  5. Check elasticsearch logs. No errors or warning there either.

I have dead_letter_queue.enable: true but its empty.

What I can see from Retry Policy is: 400 and 404 errors are sent to the dead letter queue (DLQ), if enabled. If a DLQ is not enabled, a log message will be emitted, and the event will be dropped. See DLQ Policy for more info. 409 errors (conflict) are logged as a warning and dropped.

Which makes me believe that I dont have 400,404 or 409 errors. Since i dont get anything in the DLQ or warnings in the logs.

Is there a way to see if it has been sent to elasticsearch or that elasticsearch is dropping the documents?

From logstash logs it seems like logstash has sent all documents to elasticsearch.

...
[2023-03-31T10:21:55,645][INFO ][logstash.inputs.jdbc     ][index][d9299291c383ba4e70e5116a1b01ba758087b0cff45119233811238f41d0be0d] (0.006227s) SELECT * FROM (
   SELECT
       *,
       'PERSON' + CAST(PERSON.Id AS VARCHAR(20)) AS uniqueid
       FROM PERSON) AS [T1] ORDER BY 1 OFFSET 14000 ROWS FETCH NEXT 1000 ROWS ONLY
[2023-03-31T10:21:56,427][INFO ][logstash.javapipeline    ][index] Pipeline terminated {"pipeline.id"=>"index"}
input {
   jdbc {
       jdbc_connection_string => "#connectionstring"
       jdbc_user => "#username"
       jdbc_password => "#password"
       jdbc_driver_library => "./logstash-core/lib/jars/mssql-jdbc-11.2.3.jre18.jar"
       jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
       clean_run => true
       record_last_run => false
       jdbc_paging_enabled => true
       jdbc_page_size => 1000
       jdbc_fetch_size => 1000
       statement => "
       SELECT
       *,
       'PERSON' + CAST(PERSON.Id AS VARCHAR(20)) AS uniqueid
       FROM PERSON"
   }
   jdbc {
       jdbc_connection_string => "#connectionstring"
       jdbc_user => "#username"
       jdbc_password => "#password"
       jdbc_driver_library => "./logstash-core/lib/jars/mssql-jdbc-11.2.3.jre18.jar"
       jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
       clean_run => true
       record_last_run => false
       jdbc_paging_enabled => true
       jdbc_page_size => 1000
       jdbc_fetch_size => 1000
       statement => "
       SELECT
       *,
       'INVOICE' + CAST(INVOICE.Id AS VARCHAR(20)) AS uniqueid
       FROM INVOICE"
   }
}
output {
   elasticsearch {
       hosts => ["#elasticsearchurl"]
       index => "#elasticindex"
       action => "index"
       document_id => "%{uniqueid}"
   }
}

Upvotes: 0

Views: 239

Answers (1)

Musab Dogan
Musab Dogan

Reputation: 3690

You can use the Logstash node stats APIs to understand where is the problem.

curl -XGET 'localhost:9600/_node/stats/events?pretty'

output

{
  "events" : {
    "in" : 28000,
    "filtered" : 28000,
    "out" : 28000,
    "duration_in_millis" : 2324391,
    "queue_push_duration_in_millis" : 343816
  }

check events.in and events.out and make sure there are 28000 documents. If not the data can be dropped during the filter part.

enter image description here

Upvotes: 0

Related Questions