How to optimize multiprocessing in Python How to optimize multiprocessing in Python multithreading multithreading

How to optimize multiprocessing in Python


I want my program to read in images from a video stream in the main process

In producer/consumer implementations, which is what you have above, the producer, what puts tasks into the queue to be executed by the consumers, needs to be separate from the main/controlling process so that it can add tasks in parallel with the main process reading output from results queue.

Try the following. Have added a sleep in the consumer processes to simulate processing and added a second consumer to show they are being run in parallel.

It would also be a good idea to limit the size of the task queue to avoid having it run away with memory usage if processing cannot keep up with input stream. Can specify a size when calling Queue(<size>). If the queue is at that size, calls to .put will block until the queue is not full.

import timeimport multiprocessingimport cv2class ImageProcessor(multiprocessing.Process):    def __init__(self, tasks_q, results_q):        multiprocessing.Process.__init__(self)        self.tasks_q = tasks_q        self.results_q = results_q    def run(self):        while True:            image = self.tasks_q.get()            # Do computations on image            time.sleep(1)            # Display the result on stream            self.results_q.put("text")# Tasks queue with size 1 - only want one image queued# for processing. # Queue size should therefore match number of processestasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()processor = ImageProcessor(tasks_q, results_q)processor.start()def capture_display_video(vc):    rval, frame = vc.read()    while rval:            image = frame.get_image()        if not tasks_q.full():            tasks_q.put(image)        if not results_q.empty():            text = results_q.get()            cv2.putText(frame, text)        cv2.imshow("preview", frame)        rval, frame = vc.read()cv2.namedWindow("preview")vc = cv2.VideoCapture(0)if not vc.isOpened():    raise Exception("Cannot capture video")capture_display_video(vc)processor.terminate()


Here's a more elegant (IMHO) solution that utilizes multiple processes for processing your frames:

def process_image(args):    image, frame = args    #Do computations on image    return "text", frameimport cv2pool = multiprocessing.Pool()def image_source():    #Creating window and starting video capturer from camera    cv2.namedWindow("preview")    vc = cv2.VideoCapture(0)    #Try to get the first frame    if vc.isOpened():        rval, frame = vc.read()    else:        rval = False    while rval:        yield image, frame        # Getting next frame from camera        rval, frame = vc.read()for (text, frame) in pool.imap(process_image, image_source()):    # Add text to frame    cv2.putText(frame, text)    # Showing the frame with all the applied modifications    cv2.imshow("preview", frame)

Pool.imap should allow you to iterate through the pool's results while it's still processing other images from your cam.


(Updated solution based on you last code sample)

It will get images from the stream, put one in the task queue as soon as it is available, and display the last image with the last text.

I put some active loop in there to simulate a processing longer than the time between two images.I means that the text displayed is not necessarily the one belonging to the image, but the last one computed.If the processing is fast enough, the shift between image and text should be limited.

Note that I force calls to get/put with some try/catch. Per the doc, empty and full are not 100% accurate.

import cv2import multiprocessingimport randomfrom time import sleepclass Consumer(multiprocessing.Process):    def __init__(self, task_queue, result_queue):        multiprocessing.Process.__init__(self)        self.task_queue = task_queue        self.result_queue = result_queue        # Other initialization stuff    def run(self):        while True:            frameNum, frameData = self.task_queue.get()            # Do computations on image            # Simulate a processing longer than image fetching            m = random.randint(0, 1000000)            while m >= 0:                m -= 1            # Put result in queue            self.result_queue.put("result from image " + str(frameNum))        return# No more than one pending tasktasks = multiprocessing.Queue(1)results = multiprocessing.Queue()# Init and start consumerconsumer = Consumer(tasks,results)consumer.start()#Creating window and starting video capturer from cameracv2.namedWindow("preview")vc = cv2.VideoCapture(0)#Try to get the first frameif vc.isOpened():    rval, frame = vc.read()    frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)else:    rval = False# Dummy int to represent frame number for displayframeNum = 0# String for resulttext = Nonefont = cv2.FONT_HERSHEY_SIMPLEX# Process loopwhile rval:    # Grab image from stream    frameNum += 1    # Put image in task queue if empty    try:        tasks.put_nowait((frameNum, frame))    except:        pass    # Get result if ready    try:        # Use this if processing is fast enough        # text = results.get(timeout=0.4)        # Use this to prefer smooth display over frame/text shift        text = results.get_nowait()    except:        pass    # Add last available text to last image and display    print("display:", frameNum, "|", text)    # Showing the frame with all the applied modifications    cv2.putText(frame,text,(10,25), font, 1,(255,0,0),2)    cv2.imshow("preview", frame)    # Getting next frame from camera    rval, frame = vc.read()    # Optional image resize    # frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)

Here is some output, you can see the delay between image and result, and the result catching back.

> ('display:', 493, '|', 'result from image 483')> ('display:', 494, '|', 'result from image 483')> ('display:', 495, '|', 'result from image 489')> ('display:', 496, '|', 'result from image 490')> ('display:', 497, '|', 'result from image 495')> ('display:', 498, '|', 'result from image 496')