"select" on multiple Python multiprocessing Queues?
Actually you can use multiprocessing.Queue objects in select.select. i.e.
que = multiprocessing.Queue()(input,[],[]) = select.select([que._reader],[],[])
would select que only if it is ready to be read from.
No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.
With Queue.Queue I didn't have found any smart way to do this (and I would really love to).
It doesn't look like there's an official way to handle this yet. Or at least, not based on this:
You could try something like what this post is doing -- accessing the underlying pipe filehandles:
and then use select.
Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.
My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.
My code for doing this is:
"""Allow multiple queues to be waited upon.queue,value = multiq.select(list_of_queues)"""import queueimport threadingclass queue_reader(threading.Thread): def __init__(self,inq,sharedq): threading.Thread.__init__(self) self.inq = inq self.sharedq = sharedq def run(self): while True: data = self.inq.get() print ("thread reads data=",data) result = (self.inq,data) self.sharedq.put(result)class multi_queue(queue.Queue): def __init__(self,list_of_queues): queue.Queue.__init__(self) for q in list_of_queues: qr = queue_reader(q,self) qr.start()def select(list_of_queues): outq = queue.Queue() for q in list_of_queues: qr = queue_reader(q,outq) qr.start() return outq.get()
The following test routine shows how to use it:
import multiqimport queueq1 = queue.Queue()q2 = queue.Queue()q3 = multiq.multi_queue([q1,q2])q1.put(1)q2.put(2)q1.put(3)q1.put(4)res=0while not res==4: while not q3.empty(): res = q3.get()[1] print ("returning result =",res)
Hope this helps.
Tony Wallace