ElasticSearch Scroll API with multi threading
Have you tried a sliced scroll? According to the linked docs:
For scroll queries that return a lot of documents it is possible to split the scroll in multiple slices which can be consumed independently.
and
Each scroll is independent and can be processed in parallel like any scroll request.
I have not used this myself (the largest result set I need to process is ~50k documents) but this seems to be what you're looking for.
You should used sliced scroll for that, see https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-372271460 on how to do it in python.
I met the same problem as yours, but the doc size is 1.4 million. I've had to use concurrency method and use 10 threads for data writting.
I wrote the code with Java thread pool, and you can find the similar way in Python.
public class ControllerRunnable implements Runnable { private String i_res; private String i_scroll_id; private int i_index; private JSONArray i_hits; private JSONObject i_result; ControllerRunnable(int index_copy, String _scroll_id_copy) { i_index = index_copy; i_scroll_id = _scroll_id_copy; } @Override public void run(){ try { s_logger.debug("index:{}", i_index ); String nexturl = m_scrollUrl.replace("--", i_scroll_id); s_logger.debug("nexturl:{}", nexturl); i_res = get(nexturl); s_logger.debug("i_res:{}", i_res); i_result = JSONObject.parseObject(i_res); if (i_result == null) { s_logger.info("controller thread parsed result object NULL, res:{}", i_res); s_counter++; return; } i_scroll_id = (String) i_result.get("_scroll_id"); i_hits = i_result.getJSONObject("hits").getJSONArray("hits"); s_logger.debug("hits content:{}\n", i_hits.toString()); s_logger.info("hits_size:{}", i_hits.size()); if (i_hits.size() > 0) { int per_thread_data_num = i_hits.size() / s_threadnumber; for (int i = 0; i < s_threadnumber; i++) { Runnable worker = new DataRunnable(i * per_thread_data_num, (i + 1) * per_thread_data_num); m_executor.execute(worker); } // Wait until all threads are finish m_executor.awaitTermination(1, TimeUnit.SECONDS); } else { s_counter++; return; } } catch (Exception e) { s_logger.error(e.getMessage(),e); } } }