Waleed Saleh
Waleed Saleh

Reputation: 37

Simple multiprocessing code, doesn't call callback

I have that simple code, and it doesn't call the pipeline function at all I don't know where is the problem, the function is not called, is it a problem of importing those stuff ? There is a tensorflow import that is a class that uses it

import numpy as np
import time
import Sort_Algorithm
import cv2
import os
import argparse
import helpers
import detector
import json

from collections import defaultdict
from shapely.geometry import Point, LineString
import pandas as pd
from io import StringIO
from matplotlib.path import Path
import time
import datetime
from interaction import (Behavior, Interaction, select_slice,
                                 CODE_DEFAULT, CODE_MSG, CODE_RES)
import asyncio

from geom import Region, compute_max_polygon_diagonal
from  parameter import RawParameterProcessor
from multiprocessing.pool import ThreadPool
from collections import deque


from multiprocessing import Pool, Queue



def pipeline():
    while True:
        print("hello")

def main():

    pool = Pool(processes=2)

    for i in range(0,10):
         pool.apply_async(pipeline, args = (i))
    pool.close()
    pool.join()
if __name__ == "__main__":

    main()

Upvotes: 0

Views: 57

Answers (1)

Guillaume Lastecoueres
Guillaume Lastecoueres

Reputation: 215

I hope you are doing good.

It seems the issue is that it silently failed.

I have used this simplify code of yours:

from multiprocessing import Pool, Queue


def pipeline(i):
    while True:
        print("hello")


def main():

    pool = Pool(processes=2)

    for i in range(0, 10):
        pool.apply_async(pipeline, args=(i,))
    pool.close()
    pool.join()


if __name__ == "__main__":

    main()

I have changed two things, args parameter in apply_async is now a tuple, the function pipeline takes correctly the given parameter by defining it in its function declaration.

pool.apply_async(pipeline, args=(i))
# to 
pool.apply_async(pipeline, args=(i,))

and:

def pipeline():
    while True:
        print("hello")
# to
def pipeline(i):
    while True:
        print("hello")

If you don't need at all the parameter in pipeline, you can also write the following code (we don't pass any parameter):

from multiprocessing import Pool, Queue


def pipeline():
    while True:
        print("hello")


def main():

    pool = Pool(processes=2)

    for i in range(0, 10):
        pool.apply_async(pipeline)
    pool.close()
    pool.join()


if __name__ == "__main__":

    main()

Have a lovely day.

G


OP mentioned a callback but was not using one. This extends the toy example to use a callback and gather the results from the apply_async calls.

def pipeline(i):
    return i*2

def cb(n):
    print(f'foo: {n}')

def main():

    pool = Pool(processes=2)
    results= []

    for i in range(0,8):
         results.append(pool.apply_async(pipeline, args=(i,), callback=cb))
    pool.close()
    pool.join()
    return results
if __name__ == "__main__":

    results = main()
    print([result.get() for result in results])

Which prints:

foo: 0
foo: 2
foo: 4
foo: 6
foo: 8
foo: 10
foo: 12
foo: 14
[0, 2, 4, 6, 8, 10, 12, 14]
>>> 

Upvotes: 2

Related Questions