When using ElasticSearch Scroll API, how to optimize the time parameter in situ?

Ok, I did some data analysis and found a few things empirically. For many different sizes I ran 10-20 pages of a scroll api query. For a fixed size, the time it took to return a page was roughly Gaussian with means given below.

means =  {1000: 6.0284869194030763, 1500: 7.9487858772277828, 2000: 12.139444923400879, 2500: 18.494202852249146, 3000: 22.169868159294129, 3500: 28.091009926795959, 4000: 36.068559408187866, 5000: 53.229292035102844}

The next thought i had was that this may depend on whether other queries are being run on the machine, so I ran the experiment with half of the pages being the only request from ES and half while a second scroll query was running. The timing didn't seem to change.

finally, since the times will depend on the given ES configuration and bandwidth, etc.. I propose this solution.

  1. set a generous page time for the initial page.
  2. time each page
  3. use a weighted running average between the observed time + a little bit, and the initial time (so your time parameter is always a bit bigger than needed, but decreases to the mean + a little bit). Here's an example:

    tries = 0size = 3000 wait_time = 2 ## generous start time
    returned_hits = {} ## Page, list of hitswhile tries < 3: try: print "\n\tRunning the alert scroll query with size = %s... " %( size ) page = client.search(index = index, doc_type = doc_type, body = q, scroll = '1m', search_type = 'scan', size = size )

        sid = page['_scroll_id'] ## scroll id    total_hits = page['hits']['total'] ## how many results there are.     print "\t\t There are %s hits total." %(total_hits)    p = 0 ## page count     doc_count = 0 ## document count     # Start scrolling    while (scroll_size > 0):        p += 1        print "\t\t Scrolling to page %s ..." % p        start = time.time()        page = client.scroll(scroll_id = sid, scroll = str(wait_time) + 'm')        end = time.time()        ## update wait_time using a weighted running average.         wait_time =  ( (end - start + 10) + float(wait_time * p) ) / (p+1)         print "\t\t Page %s took %s seconds. We change the time to %s" %(p, end - start, wait_time)        sid = page['_scroll_id'] # Update the scroll ID        scroll_size = len(page["hits"]["hits"]) ## no. of hits returned on this page        print "\t\t Page %s has returned %s hits. Storing .." %( p, scroll_size )        returned_hits[p] = page['hits']['hits']        doc_count += scroll_size ## update the total count of docs processed        print "\t\t Returned and stored %s docs of %s \n" %(doc_count, total_hits)    tries = 3   ## set tries to three so we exit the while loop! except:     e = sys.exc_info()[0]    print "\t\t ---- Error on try %s\n\t\t size was %s, wait_time was %s min, \n\t\terror message = %s" %(tries , _size, wait_time, e)     tries += 1 ## increment tries, and do it again until 3 tries.     # wait_time *= 2 ## double the time interval for the next go round     size = int(.8 * size) ## lower size of docs per shard returned.     if tries == 3:         print "\t\t three strikes and you're out! (failed three times in a row to execute the alert query). Exiting. "    else:         print '\t\t ---- trying again for the %s-th time ...' %( tries + 1 )