Reputation: 1116
I want to make repeated requests to a server that will return with some tasks. The response from the server will be a dictionary with a list of functions that need to be called. For example:
{
tasks: [
{
function: "HelloWorld",
id: 1212
},
{
function: "GoodbyeWorld"
id: 1222
}
]
}
NOTE: I'm dummying it down.
For each of these tasks, I will run the specified function using multiprocessing
. Here is an example of my code:
r = requests.get('https://localhost:5000', auth=('user', 'pass'))
data = r.json()
if len(data["tasks"]) > 0:
manager = multiprocessing.Manager()
for task in data["tasks"]:
if task["function"] == "HelloWorld":
helloObj = HelloWorldClass()
hello = multiprocessing.Process(target=helloObj.helloWorld)
hello.start()
hello.join()
elif task["function"] == "GoodbyeWorld":
byeObj = GoodbyeWorldClass()
bye = multiprocessing.Process(target=byeObj.byeWorld)
bye.start()
bye.join()
The problem is, I want to make repeated requests and fill the data["tasks"]
array as the other processes are running. If I throw everything into some while
loop, it'll only make a request after all the processes from the initial response is done (when join()
has been reached for all processes).
Can anyone help me to make repeated requests and fill the array continuously? Please let me know if I need to make any clarifications.
Upvotes: 2
Views: 627
Reputation: 16590
You have a bug in your program, you should call the join
s after you've created all the tasks. Join blocks until the process has finished -- in your case before you start the next one. Which practically makes you whole program run sequentially.
Upvotes: 1
Reputation: 2320
If I understood you correctly, you need something like this:
import time
from multiprocessing import Process
import requests
from task import FunctionFactory
def get_tasks():
resp = requests.get('https://localhost:5000', auth=('user', 'pass'))
data = resp.json()
return data['tasks']
if __name__ == '__main__':
procs = {}
for _ in range(10):
tasks = get_tasks()
if not tasks:
time.sleep(5)
continue
for task in tasks:
if task['id'] in procs:
# This task has been already submitted for execution.
continue
func = FunctionFactory.build(task['function'])
proc = Process(target=func)
proc.start()
procs[task['id']] = proc
# Waiting for all the submitted tasks to finish.
for proc in procs.values():
proc.join()
Here, the function get_tasks
is used to request a list of dictionaries with id
and function
keys from the server. In the main section, there is a procs
dictionary that maps id
to running process instances which execute functions built by a FunctionFactory
using received tasks' function
names. In the case there is already a running task with the same id, it gets ignored.
With this approach, you can request tasks as often as needed (here, 10
requests are used in a for
loop) and start processes to execute them in parallel. In the end, you just wait for all the submitted tasks to finish.
Upvotes: 2