Sameer Deshmukh
Sameer Deshmukh

Reputation: 1379

Scroll example in ElasticSearch NEST API

I am using .From() and .Size() methods to retrieve all documents from Elastic Search results.

Below is sample example -

ISearchResponse<dynamic> bResponse = ObjElasticClient.Search<dynamic>(s => s.From(0).Size(25000).Index("accounts").AllTypes().Query(Query));

Recently i came across scroll feature of Elastic Search. This looks better approach than From() and Size() methods specifically to fetch large data.

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

I looking for example on Scroll feature in NEST API.

Can someone please provide NEST example?

Thanks, Sameer

Upvotes: 16

Views: 23283

Answers (4)

Chase Miller
Chase Miller

Reputation: 276

I took Frederick's answer and made it an extension method that leverages IAsyncEnumerable:

// Adopted from https://stackoverflow.com/a/56261657/1072030
public static async IAsyncEnumerable<T> ScrollAllAsync<T>(
    this IElasticClient elasticClient,
    string indexName,
    string scrollTimeoutMinutes = "2m",
    int scrollPageSize = 1000,
    [EnumeratorCancellation] CancellationToken ct = default
) where T : class
{
    var searchResponse = await elasticClient.SearchAsync<T>(
        sd => sd
            .Index(indexName)
            .From(0)
            .Take(scrollPageSize)
            .MatchAll()
            .Scroll(scrollTimeoutMinutes),
        ct);
    
    try
    {
        while (true)
        {
            if (!searchResponse.IsValid || string.IsNullOrEmpty(searchResponse.ScrollId))
                throw new Exception($"Search error: {searchResponse.ServerError.Error.Reason}");

            if (!searchResponse.Documents.Any())
                break;

            foreach(var item in searchResponse.Documents)
            {
                yield return item;
            }
        
            searchResponse = await elasticClient.ScrollAsync<T>(scrollTimeoutMinutes, searchResponse.ScrollId, ct: ct);
        }
    }
    finally
    {
        await elasticClient.ClearScrollAsync(new ClearScrollRequest(searchResponse.ScrollId), ct: ct);
    }
}

I'm guessing we should really be using search_after instead of the scroll api, but meh.

Upvotes: 0

I took the liberty of rewriting the fine answer from Michael to async and a bit less verbose (v. 6.x Nest):

public async Task<IList<T>> RockAndScroll<T>(
    string indexName,
    string scrollTimeoutMinutes = "2m",
    int scrollPageSize = 1000
) where T : class
{
    var searchResponse = await this.ElasticClient.SearchAsync<T>(sd => sd
        .Index(indexName)
        .From(0)
        .Take(scrollPageSize)
        .MatchAll()
        .Scroll(scrollTimeoutMinutes));

    var results = new List<T>();

    while (true)
    {
        if (!searchResponse.IsValid || string.IsNullOrEmpty(searchResponse.ScrollId))
            throw new Exception($"Search error: {searchResponse.ServerError.Error.Reason}");

        if (!searchResponse.Documents.Any())
            break;

        results.AddRange(searchResponse.Documents);
        searchResponse = await ElasticClient.ScrollAsync<T>(scrollTimeoutMinutes, searchResponse.ScrollId);
    }

    await this.ElasticClient.ClearScrollAsync(new ClearScrollRequest(searchResponse.ScrollId));

    return results;
}

Upvotes: 7

SomeGuy1989
SomeGuy1989

Reputation: 483

Here's an example of using scroll with NEST and C#. Works with 5.x and 6.x

public IEnumerable<T> GetAllDocumentsInIndex<T>(string indexName, string scrollTimeout = "2m", int scrollSize = 1000) where T : class
      {
          ISearchResponse<T> initialResponse = this.ElasticClient.Search<T>
              (scr => scr.Index(indexName)
                   .From(0)
                   .Take(scrollSize)
                   .MatchAll()
                   .Scroll(scrollTimeout));

          List<T> results = new List<T>();

          if (!initialResponse.IsValid || string.IsNullOrEmpty(initialResponse.ScrollId))
              throw new Exception(initialResponse.ServerError.Error.Reason);

          if (initialResponse.Documents.Any())
              results.AddRange(initialResponse.Documents);

          string scrollid = initialResponse.ScrollId;
          bool isScrollSetHasData = true;
          while (isScrollSetHasData)
          {
              ISearchResponse<T> loopingResponse = this.ElasticClient.Scroll<T>(scrollTimeout, scrollid);
              if (loopingResponse.IsValid)
              {
                  results.AddRange(loopingResponse.Documents);
                  scrollid = loopingResponse.ScrollId;
              }
              isScrollSetHasData = loopingResponse.Documents.Any();
          }

          this.ElasticClient.ClearScroll(new ClearScrollRequest(scrollid));
          return results;
      }

It's from: http://telegraphrepaircompany.com/elasticsearch-nest-scroll-api-c/

Upvotes: 27

Rob
Rob

Reputation: 9969

Internal implementation of NEST Reindex uses scroll to move documents from one index to another.

It should be good starting point.

Below you can find interesting for you code from github.

var page = 0;
var searchResult = this.CurrentClient.Search<T>(
    s => s
        .Index(fromIndex)
        .AllTypes()
        .From(0)
        .Size(size)
        .Query(this._reindexDescriptor._QuerySelector ?? (q=>q.MatchAll()))
        .SearchType(SearchType.Scan)
        .Scroll(scroll)
    );
if (searchResult.Total <= 0)
    throw new ReindexException(searchResult.ConnectionStatus, "index " + fromIndex + " has no documents!");
IBulkResponse indexResult = null;
do
{
    var result = searchResult;
    searchResult = this.CurrentClient.Scroll<T>(s => s
        .Scroll(scroll)
        .ScrollId(result.ScrollId)
    );
    if (searchResult.Documents.HasAny())
        indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
    page++;
} while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());

Also you can take a look at integration test for Scroll

[Test]
public void SearchTypeScan()
{
    var scanResults = this.Client.Search<ElasticsearchProject>(s => s
        .From(0)
        .Size(1)
        .MatchAll()
        .Fields(f => f.Name)
        .SearchType(SearchType.Scan)
        .Scroll("2s")
    );
    Assert.True(scanResults.IsValid);
    Assert.False(scanResults.FieldSelections.Any());
    Assert.IsNotNullOrEmpty(scanResults.ScrollId);

    var results = this.Client.Scroll<ElasticsearchProject>(s=>s
        .Scroll("4s") 
        .ScrollId(scanResults.ScrollId)
    );
    var hitCount = results.Hits.Count();
    while (results.FieldSelections.Any())
    {
        Assert.True(results.IsValid);
        Assert.True(results.FieldSelections.Any());
        Assert.IsNotNullOrEmpty(results.ScrollId);
        var localResults = results;
        results = this.Client.Scroll<ElasticsearchProject>(s=>s
            .Scroll("4s")
            .ScrollId(localResults.ScrollId));
        hitCount += results.Hits.Count();
    }
    Assert.AreEqual(scanResults.Total, hitCount);
}

Upvotes: 7

Related Questions