How to excecute multiple SQL queries to pandas dataframes in parallel How to excecute multiple SQL queries to pandas dataframes in parallel pandas pandas

How to excecute multiple SQL queries to pandas dataframes in parallel


Use N of connections in N threads. Then join theads and procces results.

# importsimport ceODBCimport numpy as npimport pandas as pdimport pandas.io.sql as psqlfrom ConfigParser import ConfigParser  import osimport globimport threadingenter code here# db connection stringcnxn_string = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'# directories (also should be moved to config)dataDir = os.getcwd() + '\\data\\'sqlDir = os.getcwd() + '\\sql\\'#variable to store resultsresponses={}responses_lock=threading.Lock()maxconnections = 8pool_sema = BoundedSemaphore(value=maxconnections)def task(fname):    with open(fname, 'r') as f: sql = f.read()    # Connect to db, run SQL, assign result into dataframe, close connection.     # to limit connections on DB used semaphore    pool_sema.acquire()    cnxn = ceODBC.connect(cnxn_string)    cursor = cnxn.cursor()    # execute the queries and close the connection. Parallelize?    df = psql.frame_query(sql, cnxn)    # close connection    cnxn.close()    pool_sema.release()    # to ensure that only one thread can modify global variable    responses_lock.acquire()    responses[fname] = df    responses_lock.release()pool = []#find sql files and spawn theadsfor fname im glob.glob( os.path.join(sqlDir,'*sql')):    #create new thread with task    thread = threading.Thread(target=task,args=(fname,))    thread.daemon = True    # store thread in pool     pool.append(thread)    #thread started    thread.start()#wait for all threads tasks donefor thread in pool:    thread.join()# results of each execution stored in responses dict

Each file executes in separate thread. Result stored in one variable.

Equivalent for function with with statement:

def task(fname):    with open(fname, 'r') as f: sql = f.read()    # Connect to db, run SQL, assign result into dataframe, close connection.     # to limit connections on DB used semaphore    with pool_sema:        cnxn = ceODBC.connect(cnxn_string)        cursor = cnxn.cursor()        # execute the queries and close the connection. Parallelize?        df = psql.frame_query(sql, cnxn)        # close connection        cnxn.close()    # to ensure that only one thread can modify global variable    with responses_lock:        responses[fname] = df

multiprocessing.Pool is easy for distributing heavy tasks, but has more IO operations in it self.