When using ElasticSearch Scroll API, how to optimize the time parameter in situ?
Ok, I did some data analysis and found a few things empirically. For many different sizes I ran 10-20 pages of a scroll api query. For a fixed size, the time it took to return a page was roughly Gaussian with means given below.
means = {1000: 6.0284869194030763, 1500: 7.9487858772277828, 2000: 12.139444923400879, 2500: 18.494202852249146, 3000: 22.169868159294129, 3500: 28.091009926795959, 4000: 36.068559408187866, 5000: 53.229292035102844}
The next thought i had was that this may depend on whether other queries are being run on the machine, so I ran the experiment with half of the pages being the only request from ES and half while a second scroll query was running. The timing didn't seem to change.
finally, since the times will depend on the given ES configuration and bandwidth, etc.. I propose this solution.
- set a generous page time for the initial page.
- time each page
use a weighted running average between the observed time + a little bit, and the initial time (so your time parameter is always a bit bigger than needed, but decreases to the mean + a little bit). Here's an example:
tries = 0
size = 3000
wait_time = 2 ## generous start time
returned_hits = {} ## Page, list of hitswhile tries < 3: try: print "\n\tRunning the alert scroll query with size = %s... " %( size ) page = client.search(index = index, doc_type = doc_type, body = q, scroll = '1m', search_type = 'scan', size = size )sid = page['_scroll_id'] ## scroll id total_hits = page['hits']['total'] ## how many results there are. print "\t\t There are %s hits total." %(total_hits) p = 0 ## page count doc_count = 0 ## document count # Start scrolling while (scroll_size > 0): p += 1 print "\t\t Scrolling to page %s ..." % p start = time.time() page = client.scroll(scroll_id = sid, scroll = str(wait_time) + 'm') end = time.time() ## update wait_time using a weighted running average. wait_time = ( (end - start + 10) + float(wait_time * p) ) / (p+1) print "\t\t Page %s took %s seconds. We change the time to %s" %(p, end - start, wait_time) sid = page['_scroll_id'] # Update the scroll ID scroll_size = len(page["hits"]["hits"]) ## no. of hits returned on this page print "\t\t Page %s has returned %s hits. Storing .." %( p, scroll_size ) returned_hits[p] = page['hits']['hits'] doc_count += scroll_size ## update the total count of docs processed print "\t\t Returned and stored %s docs of %s \n" %(doc_count, total_hits) tries = 3 ## set tries to three so we exit the while loop! except: e = sys.exc_info()[0] print "\t\t ---- Error on try %s\n\t\t size was %s, wait_time was %s min, \n\t\terror message = %s" %(tries , _size, wait_time, e) tries += 1 ## increment tries, and do it again until 3 tries. # wait_time *= 2 ## double the time interval for the next go round size = int(.8 * size) ## lower size of docs per shard returned. if tries == 3: print "\t\t three strikes and you're out! (failed three times in a row to execute the alert query). Exiting. " else: print '\t\t ---- trying again for the %s-th time ...' %( tries + 1 )