"select" on multiple Python multiprocessing Queues? "select" on multiple Python multiprocessing Queues? python python

"select" on multiple Python multiprocessing Queues?


Actually you can use multiprocessing.Queue objects in select.select. i.e.

que = multiprocessing.Queue()(input,[],[]) = select.select([que._reader],[],[])

would select que only if it is ready to be read from.

No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.

With Queue.Queue I didn't have found any smart way to do this (and I would really love to).


It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

and then use select.


Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

My code for doing this is:

"""Allow multiple queues to be waited upon.queue,value = multiq.select(list_of_queues)"""import queueimport threadingclass queue_reader(threading.Thread):    def __init__(self,inq,sharedq):        threading.Thread.__init__(self)        self.inq = inq        self.sharedq = sharedq    def run(self):        while True:            data = self.inq.get()            print ("thread reads data=",data)            result = (self.inq,data)            self.sharedq.put(result)class multi_queue(queue.Queue):    def __init__(self,list_of_queues):        queue.Queue.__init__(self)        for q in list_of_queues:            qr = queue_reader(q,self)            qr.start()def select(list_of_queues):    outq = queue.Queue()    for q in list_of_queues:        qr = queue_reader(q,outq)        qr.start()    return outq.get()

The following test routine shows how to use it:

import multiqimport queueq1 = queue.Queue()q2 = queue.Queue()q3 = multiq.multi_queue([q1,q2])q1.put(1)q2.put(2)q1.put(3)q1.put(4)res=0while not res==4:    while not q3.empty():        res = q3.get()[1]        print ("returning result =",res)

Hope this helps.

Tony Wallace