Reputation: 49
I'm running an elastic search cluster 5.6 version with 70Gb index size/ day. At the end of the day we are requested to make summarizations of each hour for the last 7 day. We are using the java version of the High Level Rest client and considering the amount of docs each query returns is critical to scroll the results.
In order to take advantage of the CPUs we have, and decrease the reading time, we were thinking about using the search Scroll Asynchronous version but we are missing some example and at least the logic inside it to move forward.
We already check elastic related documentation but it's to vague:
We also ask in the elastic discussion forum as they say but it looks like nobody can't answer that:
Any help on this will be very appreciated and for sure I'm not the only one having this req.
Upvotes: 2
Views: 4014
Reputation: 49
Here the example code:
public class App {
public static void main(String[] args) throws IOException, InterruptedException {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(HttpHost.create("http://localhost:9200")));
client.indices().delete(new DeleteIndexRequest("test"), RequestOptions.DEFAULT);
for (int i = 0; i < 100; i++) {
client.index(new IndexRequest("test", "_doc").source("foo", "bar"), RequestOptions.DEFAULT);
}
client.indices().refresh(new RefreshRequest("test"), RequestOptions.DEFAULT);
SearchRequest searchRequest = new SearchRequest("test").scroll(TimeValue.timeValueSeconds(30L));
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
System.out.println("response = " + searchResponse);
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
.scroll(TimeValue.timeValueSeconds(30));
//I was missing to wait for the results
final CountDownLatch countDownLatch = new CountDownLatch(1);
client.scrollAsync(scrollRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
System.out.println("response async = " + searchResponse);
}
@Override
public void onFailure(Exception e) {
}
});
//Here we wait
countDownLatch.await();
//Clear the scroll if we finish before the time to keep it alive.
//Otherwise it will be clear when the time is reached.
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
client.clearScrollAsync(request, new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
client.close();
}
}
Thanks to David Pilato elastic discussion
Upvotes: 2
Reputation: 1855
summarizations of each hour for the last 7 day
It sounds like you would like to run some aggregations on the data, and not get the raw docs. probably at the first level a date histogram in order to aggregate on intervals of 1hour. inside that date histogram you need an inner aggs to run your aggregations - either metrics/buckets depending on the summarizations needed.
Starting Elasticsearch v6.1 you can use the Composite Aggregation in order to get all the results buckets using paging. from the docs I linked:
the composite aggregation can be used to paginate all buckets from a multi-level aggregation efficiently. This aggregation provides a way to stream all buckets of a specific aggregation similarly to what scroll does for documents.
Unfortunately this option doesn't exist pre v6.1, so either you'll need to upgrade ES to use it, or find another way, like breaking to multiple queries, that together will cover the 7 days requirement.
Upvotes: 0