sortas
sortas

Reputation: 1673

How to run multiple asynchronous processes in Python using multiprocessing?

I need to run multiple background asynchronous functions, using multiprocessing. I have working Popen solution, but it looks a bit unnatural. Example:

from time import sleep
from multiprocessing import Process, Value
import subprocess

def worker_email(keyword):
    subprocess.Popen(["python", "mongoworker.py", str(keyword)])
    return True

keywords_list = ['apple', 'banana', 'orange', 'strawberry']

if __name__ == '__main__':
    for keyword in keywords_list:
        # Do work
        p = Process(target=worker_email, args=(keyword,))
        p.start()
        p.join()

If I try not to use Popen, like:

def worker_email(keyword):
    print('Before:' + keyword)
    sleep(10)
    print('After:' + keyword)
    return True

Functions run one-by-one, no async. So, how to run all functions at the same time without using Popen?

UPD: I'm using multiprocessing.Value to return results from Process, like:

def worker_email(keyword, func_result):
    sleep(10)
    print('Yo:' + keyword)
    func_result.value = 1
    return True

func_result = Value('i', 0)
p = Process(target=worker_email, args=(doc['check_id'],func_result))
p.start()
# Change status
if func_result.value == 1:
    stream.update_one({'_id': doc['_id']}, {"$set": {"status": True}}, upsert=False)

But it doesn't work without .join(). Any ideas how to make it work or similar way? :)

Upvotes: 2

Views: 5474

Answers (2)

sortas
sortas

Reputation: 1673

Solved problem with getting Process result by transferring result check and status update into worker function. Something like:

# Update task status if work is done
def update_status(task_id, func_result):
    # Connect to DB
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    stream = db.stream

    # Update task status if OK
    if func_result:
        stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False)

    # Close DB connection
    client.close()

# Do work
def yo_func(keyword):
    sleep(10)
    print('Yo:' + keyword)
    return True

# Worker function
def worker_email(keyword, task_id):
    update_status(task_id, yo_func(keyword))

Upvotes: 0

MegaIng
MegaIng

Reputation: 7886

If you just remove the line p.join() it should work. You only need p.join if you want to wait for the process to finish before executing further. At the end of the Program python waits for all Process to finished before closing, so you don't need to worry about that.

Upvotes: 2

Related Questions