How to excecute multiple SQL queries to pandas dataframes in parallel
Use N of connections in N threads. Then join theads and procces results.
# importsimport ceODBCimport numpy as npimport pandas as pdimport pandas.io.sql as psqlfrom ConfigParser import ConfigParser import osimport globimport threadingenter code here# db connection stringcnxn_string = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'# directories (also should be moved to config)dataDir = os.getcwd() + '\\data\\'sqlDir = os.getcwd() + '\\sql\\'#variable to store resultsresponses={}responses_lock=threading.Lock()maxconnections = 8pool_sema = BoundedSemaphore(value=maxconnections)def task(fname): with open(fname, 'r') as f: sql = f.read() # Connect to db, run SQL, assign result into dataframe, close connection. # to limit connections on DB used semaphore pool_sema.acquire() cnxn = ceODBC.connect(cnxn_string) cursor = cnxn.cursor() # execute the queries and close the connection. Parallelize? df = psql.frame_query(sql, cnxn) # close connection cnxn.close() pool_sema.release() # to ensure that only one thread can modify global variable responses_lock.acquire() responses[fname] = df responses_lock.release()pool = []#find sql files and spawn theadsfor fname im glob.glob( os.path.join(sqlDir,'*sql')): #create new thread with task thread = threading.Thread(target=task,args=(fname,)) thread.daemon = True # store thread in pool pool.append(thread) #thread started thread.start()#wait for all threads tasks donefor thread in pool: thread.join()# results of each execution stored in responses dict
Each file executes in separate thread. Result stored in one variable.
Equivalent for function with with
statement:
def task(fname): with open(fname, 'r') as f: sql = f.read() # Connect to db, run SQL, assign result into dataframe, close connection. # to limit connections on DB used semaphore with pool_sema: cnxn = ceODBC.connect(cnxn_string) cursor = cnxn.cursor() # execute the queries and close the connection. Parallelize? df = psql.frame_query(sql, cnxn) # close connection cnxn.close() # to ensure that only one thread can modify global variable with responses_lock: responses[fname] = df
multiprocessing.Pool
is easy for distributing heavy tasks, but has more IO operations in it self.