How to optimize multiprocessing in Python
I want my program to read in images from a video stream in the main process
In producer/consumer implementations, which is what you have above, the producer, what puts tasks into the queue to be executed by the consumers, needs to be separate from the main/controlling process so that it can add tasks in parallel with the main process reading output from results queue.
Try the following. Have added a sleep in the consumer processes to simulate processing and added a second consumer to show they are being run in parallel.
It would also be a good idea to limit the size of the task queue to avoid having it run away with memory usage if processing cannot keep up with input stream. Can specify a size when calling Queue(<size>)
. If the queue is at that size, calls to .put
will block until the queue is not full.
import timeimport multiprocessingimport cv2class ImageProcessor(multiprocessing.Process): def __init__(self, tasks_q, results_q): multiprocessing.Process.__init__(self) self.tasks_q = tasks_q self.results_q = results_q def run(self): while True: image = self.tasks_q.get() # Do computations on image time.sleep(1) # Display the result on stream self.results_q.put("text")# Tasks queue with size 1 - only want one image queued# for processing. # Queue size should therefore match number of processestasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()processor = ImageProcessor(tasks_q, results_q)processor.start()def capture_display_video(vc): rval, frame = vc.read() while rval: image = frame.get_image() if not tasks_q.full(): tasks_q.put(image) if not results_q.empty(): text = results_q.get() cv2.putText(frame, text) cv2.imshow("preview", frame) rval, frame = vc.read()cv2.namedWindow("preview")vc = cv2.VideoCapture(0)if not vc.isOpened(): raise Exception("Cannot capture video")capture_display_video(vc)processor.terminate()
Here's a more elegant (IMHO) solution that utilizes multiple processes for processing your frames:
def process_image(args): image, frame = args #Do computations on image return "text", frameimport cv2pool = multiprocessing.Pool()def image_source(): #Creating window and starting video capturer from camera cv2.namedWindow("preview") vc = cv2.VideoCapture(0) #Try to get the first frame if vc.isOpened(): rval, frame = vc.read() else: rval = False while rval: yield image, frame # Getting next frame from camera rval, frame = vc.read()for (text, frame) in pool.imap(process_image, image_source()): # Add text to frame cv2.putText(frame, text) # Showing the frame with all the applied modifications cv2.imshow("preview", frame)
Pool.imap
should allow you to iterate through the pool's results while it's still processing other images from your cam.
(Updated solution based on you last code sample)
It will get images from the stream, put one in the task queue as soon as it is available, and display the last image with the last text.
I put some active loop in there to simulate a processing longer than the time between two images.I means that the text displayed is not necessarily the one belonging to the image, but the last one computed.If the processing is fast enough, the shift between image and text should be limited.
Note that I force calls to get/put with some try/catch. Per the doc, empty and full are not 100% accurate.
import cv2import multiprocessingimport randomfrom time import sleepclass Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue # Other initialization stuff def run(self): while True: frameNum, frameData = self.task_queue.get() # Do computations on image # Simulate a processing longer than image fetching m = random.randint(0, 1000000) while m >= 0: m -= 1 # Put result in queue self.result_queue.put("result from image " + str(frameNum)) return# No more than one pending tasktasks = multiprocessing.Queue(1)results = multiprocessing.Queue()# Init and start consumerconsumer = Consumer(tasks,results)consumer.start()#Creating window and starting video capturer from cameracv2.namedWindow("preview")vc = cv2.VideoCapture(0)#Try to get the first frameif vc.isOpened(): rval, frame = vc.read() frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)else: rval = False# Dummy int to represent frame number for displayframeNum = 0# String for resulttext = Nonefont = cv2.FONT_HERSHEY_SIMPLEX# Process loopwhile rval: # Grab image from stream frameNum += 1 # Put image in task queue if empty try: tasks.put_nowait((frameNum, frame)) except: pass # Get result if ready try: # Use this if processing is fast enough # text = results.get(timeout=0.4) # Use this to prefer smooth display over frame/text shift text = results.get_nowait() except: pass # Add last available text to last image and display print("display:", frameNum, "|", text) # Showing the frame with all the applied modifications cv2.putText(frame,text,(10,25), font, 1,(255,0,0),2) cv2.imshow("preview", frame) # Getting next frame from camera rval, frame = vc.read() # Optional image resize # frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)
Here is some output, you can see the delay between image and result, and the result catching back.
> ('display:', 493, '|', 'result from image 483')> ('display:', 494, '|', 'result from image 483')> ('display:', 495, '|', 'result from image 489')> ('display:', 496, '|', 'result from image 490')> ('display:', 497, '|', 'result from image 495')> ('display:', 498, '|', 'result from image 496')