oki doki
oki doki

Reputation: 21

How to sort paginated logs by @timestamp with Elasticsearch?

My goal is to sort millions of logs by timestamp that I receive out of Elasticsearch.

Example logs:

{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}

Unfortunately, I am not able to get all the logs sorted out of Elastic. It seems like I have to do it by myself.

Approaches I have tried to get the data sorted out of elastic:

es = Search(index="somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
    print(hit['@timestamp'])

Another approach:

notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .scan()
)

So I am looking for a way to sort these logs by myself or directly through Elasticsearch. Currently, I am saving all the data in a local 'logs.json' and it seems to me I have to iter over and sort it by myself.

Upvotes: 1

Views: 3455

Answers (2)

Gino Mempin
Gino Mempin

Reputation: 29600

You should definitely let Elasticsearch do the sorting, then return the data to you already sorted.

The problem is that you are using .scan(). It uses Elasticsearch's scan/scroll API, which unfortunately only applies the sorting params on each page/slice, not the entire search result. This is noted in the elasticsearch-dsl docs on Pagination:

Pagination

...
If you want to access all the documents matched by your query you can use the scan method which uses the scan/scroll elasticsearch API:

for hit in s.scan():
    print(hit.title)

Note that in this case the results won’t be sorted.

(emphasis mine)

Using pagination is definitely an option especially when you have a "millions of logs" as you said. There is a search_after pagination API:

Search after

You can use the search_after parameter to retrieve the next page of hits using a set of sort values from the previous page.
...
To get the first page of results, submit a search request with a sort argument.
...
The search response includes an array of sort values for each hit.
...
To get the next page of results, rerun the previous search using the last hit’s sort values as the search_after argument. ... The search’s query and sort arguments must remain unchanged. If provided, the from argument must be 0 (default) or -1.
...
You can repeat this process to get additional pages of results.

(omitted the raw JSON requests since I'll show a sample in Python below)

Here's a sample how to do it with elasticsearch-dsl for Python. Note that I'm limiting the fields and the number of results to make it easier to test. The important parts here are the sort and the extra(search_after=).

search = Search(using=client, index='some-index')

# The main query
search = search.extra(size=100)
search = search.query('range', **{'@timestamp': {'gte': '2020-12-29T09:00', 'lt': '2020-12-29T09:59'}})
search = search.source(fields=('@timestamp', ))
search = search.sort({
    '@timestamp': {
        'order': 'desc'
    },
})

# Store all the results (it would be better to be wrap all this in a generator to be performant)
hits = []

# Get the 1st page
results = search.execute()
hits.extend(results.hits)
total = results.hits.total
print(f'Expecting {total}')

# Get the next pages
# Real use-case condition should be "until total" or "until no more results.hits"
while len(hits) < 1000:  
    print(f'Now have {len(hits)}')
    last_hit_sort_id = hits[-1].meta.sort[0]
    search = search.extra(search_after=[last_hit_sort_id])
    results = search.execute()
    hits.extend(results.hits)

with open('results.txt', 'w') as out:
    for hit in hits:
        out.write(f'{hit["@timestamp"]}\n')

That would lead to an already sorted data:

# 1st 10 lines
2020-12-29T09:58:57.749Z
2020-12-29T09:58:55.736Z
2020-12-29T09:58:53.627Z
2020-12-29T09:58:52.738Z
2020-12-29T09:58:47.221Z
2020-12-29T09:58:45.676Z
2020-12-29T09:58:44.523Z
2020-12-29T09:58:43.541Z
2020-12-29T09:58:40.116Z
2020-12-29T09:58:38.206Z
...
# 250-260
2020-12-29T09:50:31.117Z
2020-12-29T09:50:27.754Z
2020-12-29T09:50:25.738Z
2020-12-29T09:50:23.601Z
2020-12-29T09:50:17.736Z
2020-12-29T09:50:15.753Z
2020-12-29T09:50:14.491Z
2020-12-29T09:50:13.555Z
2020-12-29T09:50:07.721Z
2020-12-29T09:50:05.744Z
2020-12-29T09:50:03.630Z 
...
# 675-685
2020-12-29T09:43:30.609Z
2020-12-29T09:43:30.608Z
2020-12-29T09:43:30.602Z
2020-12-29T09:43:30.570Z
2020-12-29T09:43:30.568Z
2020-12-29T09:43:30.529Z
2020-12-29T09:43:30.475Z
2020-12-29T09:43:30.474Z
2020-12-29T09:43:30.468Z
2020-12-29T09:43:30.418Z
2020-12-29T09:43:30.417Z
...
# 840-850
2020-12-29T09:43:27.953Z
2020-12-29T09:43:27.929Z
2020-12-29T09:43:27.927Z
2020-12-29T09:43:27.920Z
2020-12-29T09:43:27.897Z
2020-12-29T09:43:27.895Z
2020-12-29T09:43:27.886Z
2020-12-29T09:43:27.861Z
2020-12-29T09:43:27.860Z
2020-12-29T09:43:27.853Z
2020-12-29T09:43:27.828Z
...
# Last 3
2020-12-29T09:43:25.878Z
2020-12-29T09:43:25.876Z
2020-12-29T09:43:25.869Z 

There are some considerations on using search_after as discussed in the API docs:

  • Use a Point In Time or PIT parameter
    • If a refresh occurs between these requests, the order of your results may change, causing inconsistent results across pages. To prevent this, you can create a point in time (PIT) to preserve the current index state over your searches.

    • You need to first make a POST request to get a PIT ID
    • Then add an extra 'pit': {'id':xxxx, 'keep_alive':5m} parameter to every request
    • Make sure to use the PIT ID from the last response
  • Use a tiebreaker
    • We recommend you include a tiebreaker field in your sort. This tiebreaker field should contain a unique value for each document. If you don’t include a tiebreaker field, your paged results could miss or duplicate hits.

    • This would depend on your Document schema
      # Add some ID as a tiebreaker to the `sort` call
      search = search.sort(
          {'@timestamp': {
              'order': 'desc'
          }},
          {'some.id': {
              'order': 'desc'
          }}
      )
      
      # Include both the sort ID and the some.ID in `search_after`
      last_hit_sort_id, last_hit_route_id = hits[-1].meta.sort
      search = search.extra(search_after=[last_hit_sort_id, last_hit_route_id])
      

Upvotes: 2

oki doki
oki doki

Reputation: 21

Thank you Gino Mempin. It works!

But I also figured out, that a simple change does the same job.

by adding .params(preserve_order=True) elasticsearch will sort all the data.

es = Search(index="somelog-*").using(client)
notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .params(preserve_order=True)
    .scan()
)

Upvotes: 1

Related Questions