Talha Yousuf
Talha Yousuf

Reputation: 301

When to use multiprocessing.Queue over multiprocessing.Pool? When is there a need to use multiprocessing.Queue?

I have tried multiprocessing.dummy.Pool and multiprocessing.Pool in multiple deep learning projects. I am having a hard time understanding the multiprocessing.Queue, I don't understand its need. Is there a special condition where it is useful.

As an example I have following target function:

def process_detection( det_, dims ,classes):

    W = dims[0]
    H = dims[1]
    classes = classes
    boxes = []
    confidences=[]
    classIDs=[]
    classes_pred=[]
    for detection in det_:
                xcenter, ycenter, width, height = np.asarray([W, H, W, H]) * detection[0:4]
                confidence_encoded = detection[5:] # (80,) array
                index_class = np.argmax(confidence_encoded) #index of max confidence
                confidence = confidence_encoded[index_class] # float value of confidence (probability)
                # print(classes)
                class_predicted = classes[index_class] # class predicted

                if confidence > 0.5:
                    if class_predicted == "person":
                        print("{} , {:.2f}".format(class_predicted, confidence))
                        # continue
                        topX = int(xcenter - width/2.)
                        topY = int(ycenter - height/2.)
                        width = int(width)
                        height = int(height)
                        confidence = float(confidence)

                        bbox = [topX, topY, width, height]

                        boxes.append(bbox)
                        confidences.append(confidence)
                        classIDs.append(index_class)
                        classes_pred.append(class_predicted)

    return [boxes, confidences, classIDs, classes_pred]

I am using multiprocessing.Pool.starmap to process a list of bounding boxes predicted by YOLOv3. The relevant function is below:

def main():

    pool = Pool(processes=os.cpu_count()) # make a process pool  for multi-processing

    path = Path("..")
    classes = open(str(path.joinpath("coco.names")), "r").read().strip().split("\n")
    colors_array = np.random.randint(0,255,(len(classes),3),dtype="uint8")
    colors = {cls_:clr for cls_,clr in zip(classes, colors_array)}


    # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
    #                reading the video
    # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%


    cap = cv2.VideoCapture(str(path.joinpath("video_.mp4")))

    _, frame = cap.read()

    if frame is None:
        print(f"FRAME IS NOT READ")
    else:
        # frame = resize(frame, width=500)
        H, W = frame.shape[0:2]

    # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
    #                <model>
    # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
    configPath = path.joinpath("yolov3.cfg")
    weightsPath = path.joinpath("yolov3.weights")


    net = cv2.dnn.readNetFromDarknet(str(configPath), str(weightsPath))
    ln = net.getLayerNames()
    ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]



    writer = None


    boxes = []
    confidences = []
    classIDs = []
    classes_pred = []

    fps_ = FPS().start()
    i = 0

    while True:
        # pool = Pool(processes=os.cpu_count()) # make a process pool  for multi-processing
        try:
            if writer is None:

                writer = cv2.VideoWriter("./detections.avi", cv2.VideoWriter_fourcc(*"MJPG"), int(cap.get(cv2.CAP_PROP_FPS)), (W, H))
                # after this writer will not be none

            _, frame = cap.read() # reading the frame
            # frame = resize(frame, width=W) # resizing the frame

            blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416),
            swapRB=True, crop=False) # yolov3 version

            net.setInput(blob)

            start = time()
            detections = net.forward(ln)
            end = time()
            print(f"{(end-start):.2f} seconds taken for detection")

            # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
            #                MULTIPROCESSING
            # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%


            results = pool.starmap_async(process_detection, zip(detections, repeat((W,H)) , repeat(classes) ) )

            boxes, confidences, classIDs, classes_pred = results.get()[1]

            #%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
            cleaned_indices = cv2.dnn.NMSBoxes(boxes, confidences, 0.5 , 0.3) # 0.3 --> nms threshold
            print(f"TOTAL INDICES CLEANED ARE {len(cleaned_indices):00d}")

            if len(cleaned_indices)>0:

                for cleaned_idx in cleaned_indices.flatten():
                    BOX = boxes[cleaned_idx]
                    color = [int(i) for i in colors[classes_pred[cleaned_idx]]]
                    # print(colors[cleaned_idx], cleaned_idx)
                    cv2.rectangle(frame, (BOX[0],BOX[1]), (BOX[0]+BOX[2], BOX[1]+BOX[3]),color, 1, cv2.LINE_AA)
                    text = f"{classes_pred[cleaned_idx]} : {confidences[cleaned_idx]:.2f}"
                    cv2.putText(frame, text, (BOX[0], BOX[1] - 5), cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, color, 2)

            writer.write(frame)

(pool is closed OUTSIDE while loop).

Upvotes: 0

Views: 133

Answers (1)

Roland Smith
Roland Smith

Reputation: 43495

In general it is not necessary (nor useful) to use a Pool and a Queue together.

The way a Pool is most useful is to run the same code with different data in parallel on multiple cores to increase throughput. That is, using the map method and its variants. This is useful for situations where the calculation done on each data-item is independent of all the others.

Mechanisms like Queue and Pipe are for communicating between different processes. If you need a Queue or a Pipe in a pool worker, then the calculations done by that pool worker are by definition not independent. At best, that reduces the performance of the Pool because the pool workers might have to wait for data to become available. At worst, it might stall the Pool completely if all the workers are busy waiting for data to appear from a Queue.


How to use a Pool

If you expect that all the calculations will take approximately the same time, just use the map method. This will return when all calculations are finished. And the returned values are guaranteed to be in the same order as the submitted data. (Hint: there is little point in using the _async methods when the next thing you do is to call the get method on the result object.)

If some calculations take (much) longer than others, I would suggest using imap_unordered. This will return an iterator that will start yielding results as soon as they are ready. The results will be in the order that they finished, not in the order they were submitted, so you should add some identifier to the result to enable you to tell to which input data the result belongs.

Upvotes: 2

Related Questions