Reindexing Elastic search via Bulk API, scan and scroll
here is an example of reindexing to another elasticsearch node using elasticsearch-py:
from elasticsearch import helperses_src = Elasticsearch(["host"])es_des = Elasticsearch(["host"])helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)
you can also reindex the result of a query to a different index here is how to do it:
from elasticsearch import helperses_src = Elasticsearch(["host"])es_des = Elasticsearch(["host"])body = {"query": {"term": {"year": "2004"}}}helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
Hi you can use the scroll api to go through all the documents in the most efficient way. Using the scroll_id you can find a session that is stored on the server for your specific scroll request. So you need to provide the scroll_id with each request to obtain more items.
The bulk api is for more efficient indexing documents. When copying and index you need both, but they are not really related.
I do have some java code that might help you to get a better idea about how it works.
public void reIndex() { logger.info("Start creating a new index based on the old index."); SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX) .setQuery(matchAllQuery()) .setSearchType(SearchType.SCAN) .setScroll(createScrollTimeoutValue()) .setSize(SCROLL_SIZE).execute().actionGet(); BulkProcessor bulkProcessor = BulkProcessor.builder(client, createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD) .setConcurrentRequests(BULK_CONCURRENT_REQUESTS) .setFlushInterval(createFlushIntervalTime()) .build(); while (true) { searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(createScrollTimeoutValue()).execute().actionGet(); if (searchResponse.getHits().getHits().length == 0) { logger.info("Closing the bulk processor"); bulkProcessor.close(); break; //Break condition: No hits are returned } for (SearchHit hit : searchResponse.getHits()) { IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id()); request.source(hit.sourceRef()); bulkProcessor.add(request); } }}
For anyone who runs into this problem, you can use the following API from the Python client to reindex:
https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex
This would help you avoid having to scroll and search to get all the data and use the bulk API to put data into the new index.