Lukasz
Lukasz

Reputation: 2616

Elasticsearch scroll upper limit - python api

Is there a way using the python api to set an upper limit to the number of documents that are retrieved if we scroll in chunks of a specific size. So let's say I want a maximum of 100K documents being scrolled in chunks of 2K, where there are over 10Mil documents available.

I've implemented a counter like object but I want to know if there is a more natural solution.

es_query = {"query": {"function_score": {"functions": [{"random_score": {"seed": "1234"}}]}}}
es = Elasticsearch(ADDRESS, port=PORT)


result = es.search(
    index="INDEX", 
    doc_type="DOC_TYPE", 
    body=es_query,
    size=2000,
    scroll="1m")

data = []
for hit in result["hits"]["hits"]:
    for d in hit["_source"]["attributes"]["data_of_interest"]:
        data.append(d)
        do_something(*args)


scroll_id = result['_scroll_id']
scroll_size = result["hits"]["total"]

i = 0
while(scroll_size>0):
    if i % 10000 == 0:
        print("Scrolling ({})...".format(i))

    result = es.scroll(scroll_id=scroll_id, scroll="1m")
    scroll_id = result["_scroll_id"]
    scroll_size = len(result['hits']['hits'])

data = []
for hit in result["hits"]["hits"]:
    for d in hit["_source"]["attributes"]["data_of_interest"]:
        data.append(d)
        do_something(*args)

i += 1
if i == 100000:
    break

Upvotes: 1

Views: 2137

Answers (1)

MCMZL
MCMZL

Reputation: 1146

To me if you only want the first 100K you should narrow your query in the first place. That wills speed up your process. You can add a filter on date for example.

Regarding the code I do not know other way than using the counter. I would just correct the indentation and remove the if statement for readability.

es_query = {"query": {"function_score": {"functions": [{"random_score": {"seed": "1234"}}]}}}
es = Elasticsearch(ADDRESS, port=PORT)


result = es.search(
    index="INDEX", 
    doc_type="DOC_TYPE", 
    body=es_query,
    size=2000,
    scroll="1m")

data = []
for hit in result["hits"]["hits"]:
    for d in hit["_source"]["attributes"]["data_of_interest"]:
        data.append(d)
        do_something(*args)

scroll_id = result['_scroll_id']
scroll_size = result["hits"]["total"]

i = 0
while(scroll_size > 0 & i < 100000):

    print("Scrolling ({})...".format(i))

    result = es.scroll(scroll_id=scroll_id, scroll="1m")
    scroll_id = result["_scroll_id"]
    scroll_size = len(result['hits']['hits'])

    # data = [] why redefining the list ? 
    for hit in result["hits"]["hits"]:
        for d in hit["_source"]["attributes"]["data_of_interest"]:
            data.append(d)
            do_something(*args)
    i ++

Upvotes: 1

Related Questions