When using ElasticSearch Scroll API, how to optimize the time parameter in situ? When using ElasticSearch Scroll API, how to optimize the time parameter in situ? elasticsearch elasticsearch

When using ElasticSearch Scroll API, how to optimize the time parameter in situ?


Ok, I did some data analysis and found a few things empirically. For many different sizes I ran 10-20 pages of a scroll api query. For a fixed size, the time it took to return a page was roughly Gaussian with means given below.

means =  {1000: 6.0284869194030763, 1500: 7.9487858772277828, 2000: 12.139444923400879, 2500: 18.494202852249146, 3000: 22.169868159294129, 3500: 28.091009926795959, 4000: 36.068559408187866, 5000: 53.229292035102844}

The next thought i had was that this may depend on whether other queries are being run on the machine, so I ran the experiment with half of the pages being the only request from ES and half while a second scroll query was running. The timing didn't seem to change.

finally, since the times will depend on the given ES configuration and bandwidth, etc.. I propose this solution.

  1. set a generous page time for the initial page.
  2. time each page
  3. use a weighted running average between the observed time + a little bit, and the initial time (so your time parameter is always a bit bigger than needed, but decreases to the mean + a little bit). Here's an example:

    tries = 0size = 3000 wait_time = 2 ## generous start time
    returned_hits = {} ## Page, list of hitswhile tries < 3: try: print "\n\tRunning the alert scroll query with size = %s... " %( size ) page = client.search(index = index, doc_type = doc_type, body = q, scroll = '1m', search_type = 'scan', size = size )

        sid = page['_scroll_id'] ## scroll id    total_hits = page['hits']['total'] ## how many results there are.     print "\t\t There are %s hits total." %(total_hits)    p = 0 ## page count     doc_count = 0 ## document count     # Start scrolling    while (scroll_size > 0):        p += 1        print "\t\t Scrolling to page %s ..." % p        start = time.time()        page = client.scroll(scroll_id = sid, scroll = str(wait_time) + 'm')        end = time.time()        ## update wait_time using a weighted running average.         wait_time =  ( (end - start + 10) + float(wait_time * p) ) / (p+1)         print "\t\t Page %s took %s seconds. We change the time to %s" %(p, end - start, wait_time)        sid = page['_scroll_id'] # Update the scroll ID        scroll_size = len(page["hits"]["hits"]) ## no. of hits returned on this page        print "\t\t Page %s has returned %s hits. Storing .." %( p, scroll_size )        returned_hits[p] = page['hits']['hits']        doc_count += scroll_size ## update the total count of docs processed        print "\t\t Returned and stored %s docs of %s \n" %(doc_count, total_hits)    tries = 3   ## set tries to three so we exit the while loop! except:     e = sys.exc_info()[0]    print "\t\t ---- Error on try %s\n\t\t size was %s, wait_time was %s min, \n\t\terror message = %s" %(tries , _size, wait_time, e)     tries += 1 ## increment tries, and do it again until 3 tries.     # wait_time *= 2 ## double the time interval for the next go round     size = int(.8 * size) ## lower size of docs per shard returned.     if tries == 3:         print "\t\t three strikes and you're out! (failed three times in a row to execute the alert query). Exiting. "    else:         print '\t\t ---- trying again for the %s-th time ...' %( tries + 1 )