Reputation: 301
I have tried multiprocessing.dummy.Pool
and multiprocessing.Pool
in multiple deep learning projects. I am having a hard time understanding the multiprocessing.Queue
, I don't understand its need. Is there a special condition where it is useful.
As an example I have following target function:
def process_detection( det_, dims ,classes):
W = dims[0]
H = dims[1]
classes = classes
boxes = []
confidences=[]
classIDs=[]
classes_pred=[]
for detection in det_:
xcenter, ycenter, width, height = np.asarray([W, H, W, H]) * detection[0:4]
confidence_encoded = detection[5:] # (80,) array
index_class = np.argmax(confidence_encoded) #index of max confidence
confidence = confidence_encoded[index_class] # float value of confidence (probability)
# print(classes)
class_predicted = classes[index_class] # class predicted
if confidence > 0.5:
if class_predicted == "person":
print("{} , {:.2f}".format(class_predicted, confidence))
# continue
topX = int(xcenter - width/2.)
topY = int(ycenter - height/2.)
width = int(width)
height = int(height)
confidence = float(confidence)
bbox = [topX, topY, width, height]
boxes.append(bbox)
confidences.append(confidence)
classIDs.append(index_class)
classes_pred.append(class_predicted)
return [boxes, confidences, classIDs, classes_pred]
I am using multiprocessing.Pool.starmap
to process a list of bounding boxes predicted by YOLOv3. The relevant function is below:
def main():
pool = Pool(processes=os.cpu_count()) # make a process pool for multi-processing
path = Path("..")
classes = open(str(path.joinpath("coco.names")), "r").read().strip().split("\n")
colors_array = np.random.randint(0,255,(len(classes),3),dtype="uint8")
colors = {cls_:clr for cls_,clr in zip(classes, colors_array)}
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# reading the video
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
cap = cv2.VideoCapture(str(path.joinpath("video_.mp4")))
_, frame = cap.read()
if frame is None:
print(f"FRAME IS NOT READ")
else:
# frame = resize(frame, width=500)
H, W = frame.shape[0:2]
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# <model>
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
configPath = path.joinpath("yolov3.cfg")
weightsPath = path.joinpath("yolov3.weights")
net = cv2.dnn.readNetFromDarknet(str(configPath), str(weightsPath))
ln = net.getLayerNames()
ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]
writer = None
boxes = []
confidences = []
classIDs = []
classes_pred = []
fps_ = FPS().start()
i = 0
while True:
# pool = Pool(processes=os.cpu_count()) # make a process pool for multi-processing
try:
if writer is None:
writer = cv2.VideoWriter("./detections.avi", cv2.VideoWriter_fourcc(*"MJPG"), int(cap.get(cv2.CAP_PROP_FPS)), (W, H))
# after this writer will not be none
_, frame = cap.read() # reading the frame
# frame = resize(frame, width=W) # resizing the frame
blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416),
swapRB=True, crop=False) # yolov3 version
net.setInput(blob)
start = time()
detections = net.forward(ln)
end = time()
print(f"{(end-start):.2f} seconds taken for detection")
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# MULTIPROCESSING
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
results = pool.starmap_async(process_detection, zip(detections, repeat((W,H)) , repeat(classes) ) )
boxes, confidences, classIDs, classes_pred = results.get()[1]
#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
cleaned_indices = cv2.dnn.NMSBoxes(boxes, confidences, 0.5 , 0.3) # 0.3 --> nms threshold
print(f"TOTAL INDICES CLEANED ARE {len(cleaned_indices):00d}")
if len(cleaned_indices)>0:
for cleaned_idx in cleaned_indices.flatten():
BOX = boxes[cleaned_idx]
color = [int(i) for i in colors[classes_pred[cleaned_idx]]]
# print(colors[cleaned_idx], cleaned_idx)
cv2.rectangle(frame, (BOX[0],BOX[1]), (BOX[0]+BOX[2], BOX[1]+BOX[3]),color, 1, cv2.LINE_AA)
text = f"{classes_pred[cleaned_idx]} : {confidences[cleaned_idx]:.2f}"
cv2.putText(frame, text, (BOX[0], BOX[1] - 5), cv2.FONT_HERSHEY_SIMPLEX,
0.5, color, 2)
writer.write(frame)
(pool is closed OUTSIDE while loop).
Upvotes: 0
Views: 133
Reputation: 43495
In general it is not necessary (nor useful) to use a Pool
and a Queue
together.
The way a Pool
is most useful is to run the same code with different data in parallel on multiple cores to increase throughput. That is, using the map
method and its variants. This is useful for situations where the calculation done on each data-item is independent of all the others.
Mechanisms like Queue
and Pipe
are for communicating between different processes.
If you need a Queue
or a Pipe
in a pool worker, then the calculations done by that pool worker are by definition not independent. At best, that reduces the performance of the Pool
because the pool workers might have to wait for data to become available. At worst, it might stall the Pool
completely if all the workers are busy waiting for data to appear from a Queue
.
Pool
If you expect that all the calculations will take approximately the same time, just use the map
method. This will return when all calculations are finished. And the returned values are guaranteed to be in the same order as the submitted data.
(Hint: there is little point in using the _async
methods when the next thing you do is to call the get
method on the result object.)
If some calculations take (much) longer than others, I would suggest using imap_unordered
. This will return an iterator that will start yielding results as soon as they are ready. The results will be in the order that they finished, not in the order they were submitted, so you should add some identifier to the result to enable you to tell to which input data the result belongs.
Upvotes: 2