A l w a y s S u n n y
A l w a y s S u n n y

Reputation: 38502

ElasticSearch Scroll API with multi threading

First of all, I want to let you guys know that I know the basic work logic of how ElasticSearch Scroll API works. To use Scroll API, first, we need to call search method with some scroll value like 1m, then it will return a _scroll_id that will be used for the next consecutive calls on Scroll until all of the doc returns within loop. But the problem is I just want to use the same process on multi-thread basis, not on serially. For example:

If I have 300000 documents, then I want to process/get the docs this way

So my question is as I didn't find any way to set the from value on scroll API how can I make the scrolling process faster with threading. Not to process the documents in a serialized manner.

My sample python code

if index_name is not None and doc_type is not None and body is not None:
   es = init_es()
   page = es.search(index_name,doc_type, scroll = '30s',size = 10, body = body)
   sid = page['_scroll_id']
   scroll_size = page['hits']['total']

   # Start scrolling
   while (scroll_size > 0):

       print("Scrolling...")
       page = es.scroll(scroll_id=sid, scroll='30s')
       # Update the scroll ID
       sid = page['_scroll_id']

       print("scroll id: " + sid)

       # Get the number of results that we returned in the last scroll
       scroll_size = len(page['hits']['hits'])
       print("scroll size: " + str(scroll_size))

       print("scrolled data :" )
       print(page['aggregations'])

Upvotes: 4

Views: 10652

Answers (4)

Jiancong
Jiancong

Reputation: 65

I met the same problem as yours, but the doc size is 1.4 million. I've had to use concurrency method and use 10 threads for data writting.

I wrote the code with Java thread pool, and you can find the similar way in Python.

    public class ControllerRunnable implements  Runnable {
        private String i_res;
        private String i_scroll_id;
        private int i_index;
        private JSONArray i_hits;
        private JSONObject i_result;

        ControllerRunnable(int index_copy, String _scroll_id_copy) {
            i_index = index_copy;
            i_scroll_id = _scroll_id_copy;
        }

        @Override
        public void run(){
            try {
                s_logger.debug("index:{}", i_index );
                String nexturl = m_scrollUrl.replace("--", i_scroll_id);
                s_logger.debug("nexturl:{}", nexturl);
                i_res = get(nexturl);

                s_logger.debug("i_res:{}", i_res);

                i_result = JSONObject.parseObject(i_res);
                if (i_result == null) {
                    s_logger.info("controller thread parsed result object NULL, res:{}", i_res);
                    s_counter++;
                    return;
                }
                i_scroll_id = (String) i_result.get("_scroll_id");
                i_hits = i_result.getJSONObject("hits").getJSONArray("hits");
                s_logger.debug("hits content:{}\n", i_hits.toString());

                s_logger.info("hits_size:{}", i_hits.size());

                if (i_hits.size() > 0) {
                    int per_thread_data_num = i_hits.size() / s_threadnumber;
                    for (int i = 0; i < s_threadnumber; i++) {
                        Runnable worker = new DataRunnable(i * per_thread_data_num,
                                (i + 1) * per_thread_data_num);
                        m_executor.execute(worker);
                    }
                    // Wait until all threads are finish
                    m_executor.awaitTermination(1, TimeUnit.SECONDS);
                } else {
                    s_counter++;
                    return;
                }
            } catch (Exception e) {
                s_logger.error(e.getMessage(),e);
            }
        }
    }

Upvotes: 1

Honza Kr&#225;l
Honza Kr&#225;l

Reputation: 3022

You should used sliced scroll for that, see https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460 on how to do it in python.

Upvotes: 3

Thomas Decaux
Thomas Decaux

Reputation: 22661

scroll must be synchronous, this is the logic.

You can use multi thread, this is exactly why elasticsearch is good for: parallelism.

An elasticsearch index, is composed of shards, this is the physical storage of your data. Shards can be on the same node or not (better).

Another side, the search API offers a very nice option: _preference(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-preference.html)

So back to your app:

  1. Get the list of index shards (and nodes)
  2. Create a thread by shard
  3. Do the scroll search on each thread

Et voilà!

Also, you could use the elasticsearch4hadoop plugin, which do exactly that for Spark / PIG / map-reduce / Hive.

Upvotes: 0

Paul
Paul

Reputation: 20061

Have you tried a sliced scroll? According to the linked docs:

For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently.

and

Each scroll is independent and can be processed in parallel like any scroll request.

I have not used this myself (the largest result set I need to process is ~50k documents) but this seems to be what you're looking for.

Upvotes: 8

Related Questions