Reputation: 1379
I am using .From() and .Size() methods to retrieve all documents from Elastic Search results.
Below is sample example -
ISearchResponse<dynamic> bResponse = ObjElasticClient.Search<dynamic>(s => s.From(0).Size(25000).Index("accounts").AllTypes().Query(Query));
Recently i came across scroll feature of Elastic Search. This looks better approach than From() and Size() methods specifically to fetch large data.
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
I looking for example on Scroll feature in NEST API.
Can someone please provide NEST example?
Thanks, Sameer
Upvotes: 16
Views: 23283
Reputation: 276
I took Frederick's answer and made it an extension method that leverages IAsyncEnumerable:
// Adopted from https://stackoverflow.com/a/56261657/1072030
public static async IAsyncEnumerable<T> ScrollAllAsync<T>(
this IElasticClient elasticClient,
string indexName,
string scrollTimeoutMinutes = "2m",
int scrollPageSize = 1000,
[EnumeratorCancellation] CancellationToken ct = default
) where T : class
{
var searchResponse = await elasticClient.SearchAsync<T>(
sd => sd
.Index(indexName)
.From(0)
.Take(scrollPageSize)
.MatchAll()
.Scroll(scrollTimeoutMinutes),
ct);
try
{
while (true)
{
if (!searchResponse.IsValid || string.IsNullOrEmpty(searchResponse.ScrollId))
throw new Exception($"Search error: {searchResponse.ServerError.Error.Reason}");
if (!searchResponse.Documents.Any())
break;
foreach(var item in searchResponse.Documents)
{
yield return item;
}
searchResponse = await elasticClient.ScrollAsync<T>(scrollTimeoutMinutes, searchResponse.ScrollId, ct: ct);
}
}
finally
{
await elasticClient.ClearScrollAsync(new ClearScrollRequest(searchResponse.ScrollId), ct: ct);
}
}
I'm guessing we should really be using search_after
instead of the scroll api, but meh.
Upvotes: 0
Reputation: 13211
I took the liberty of rewriting the fine answer from Michael to async and a bit less verbose (v. 6.x Nest):
public async Task<IList<T>> RockAndScroll<T>(
string indexName,
string scrollTimeoutMinutes = "2m",
int scrollPageSize = 1000
) where T : class
{
var searchResponse = await this.ElasticClient.SearchAsync<T>(sd => sd
.Index(indexName)
.From(0)
.Take(scrollPageSize)
.MatchAll()
.Scroll(scrollTimeoutMinutes));
var results = new List<T>();
while (true)
{
if (!searchResponse.IsValid || string.IsNullOrEmpty(searchResponse.ScrollId))
throw new Exception($"Search error: {searchResponse.ServerError.Error.Reason}");
if (!searchResponse.Documents.Any())
break;
results.AddRange(searchResponse.Documents);
searchResponse = await ElasticClient.ScrollAsync<T>(scrollTimeoutMinutes, searchResponse.ScrollId);
}
await this.ElasticClient.ClearScrollAsync(new ClearScrollRequest(searchResponse.ScrollId));
return results;
}
Upvotes: 7
Reputation: 483
Here's an example of using scroll with NEST and C#. Works with 5.x and 6.x
public IEnumerable<T> GetAllDocumentsInIndex<T>(string indexName, string scrollTimeout = "2m", int scrollSize = 1000) where T : class
{
ISearchResponse<T> initialResponse = this.ElasticClient.Search<T>
(scr => scr.Index(indexName)
.From(0)
.Take(scrollSize)
.MatchAll()
.Scroll(scrollTimeout));
List<T> results = new List<T>();
if (!initialResponse.IsValid || string.IsNullOrEmpty(initialResponse.ScrollId))
throw new Exception(initialResponse.ServerError.Error.Reason);
if (initialResponse.Documents.Any())
results.AddRange(initialResponse.Documents);
string scrollid = initialResponse.ScrollId;
bool isScrollSetHasData = true;
while (isScrollSetHasData)
{
ISearchResponse<T> loopingResponse = this.ElasticClient.Scroll<T>(scrollTimeout, scrollid);
if (loopingResponse.IsValid)
{
results.AddRange(loopingResponse.Documents);
scrollid = loopingResponse.ScrollId;
}
isScrollSetHasData = loopingResponse.Documents.Any();
}
this.ElasticClient.ClearScroll(new ClearScrollRequest(scrollid));
return results;
}
It's from: http://telegraphrepaircompany.com/elasticsearch-nest-scroll-api-c/
Upvotes: 27
Reputation: 9969
Internal implementation of NEST Reindex
uses scroll to move documents from one index to another.
It should be good starting point.
Below you can find interesting for you code from github.
var page = 0;
var searchResult = this.CurrentClient.Search<T>(
s => s
.Index(fromIndex)
.AllTypes()
.From(0)
.Size(size)
.Query(this._reindexDescriptor._QuerySelector ?? (q=>q.MatchAll()))
.SearchType(SearchType.Scan)
.Scroll(scroll)
);
if (searchResult.Total <= 0)
throw new ReindexException(searchResult.ConnectionStatus, "index " + fromIndex + " has no documents!");
IBulkResponse indexResult = null;
do
{
var result = searchResult;
searchResult = this.CurrentClient.Scroll<T>(s => s
.Scroll(scroll)
.ScrollId(result.ScrollId)
);
if (searchResult.Documents.HasAny())
indexResult = this.IndexSearchResults(searchResult, observer, toIndex, page);
page++;
} while (searchResult.IsValid && indexResult != null && indexResult.IsValid && searchResult.Documents.HasAny());
Also you can take a look at integration test for Scroll
[Test]
public void SearchTypeScan()
{
var scanResults = this.Client.Search<ElasticsearchProject>(s => s
.From(0)
.Size(1)
.MatchAll()
.Fields(f => f.Name)
.SearchType(SearchType.Scan)
.Scroll("2s")
);
Assert.True(scanResults.IsValid);
Assert.False(scanResults.FieldSelections.Any());
Assert.IsNotNullOrEmpty(scanResults.ScrollId);
var results = this.Client.Scroll<ElasticsearchProject>(s=>s
.Scroll("4s")
.ScrollId(scanResults.ScrollId)
);
var hitCount = results.Hits.Count();
while (results.FieldSelections.Any())
{
Assert.True(results.IsValid);
Assert.True(results.FieldSelections.Any());
Assert.IsNotNullOrEmpty(results.ScrollId);
var localResults = results;
results = this.Client.Scroll<ElasticsearchProject>(s=>s
.Scroll("4s")
.ScrollId(localResults.ScrollId));
hitCount += results.Hits.Count();
}
Assert.AreEqual(scanResults.Total, hitCount);
}
Upvotes: 7