jim
jim

Reputation: 1116

How to make repeated requests for tasks

I want to make repeated requests to a server that will return with some tasks. The response from the server will be a dictionary with a list of functions that need to be called. For example:

{ 
   tasks: [
      {
         function: "HelloWorld",
         id: 1212
      },
      {
         function: "GoodbyeWorld"
         id: 1222
      }
   ]
}

NOTE: I'm dummying it down.

For each of these tasks, I will run the specified function using multiprocessing. Here is an example of my code:

r = requests.get('https://localhost:5000', auth=('user', 'pass'))
data = r.json()

if len(data["tasks"]) > 0:
  manager = multiprocessing.Manager()
  for task in data["tasks"]:
    if task["function"] == "HelloWorld":
      helloObj = HelloWorldClass()
      hello = multiprocessing.Process(target=helloObj.helloWorld)
      hello.start()
      hello.join()
    elif task["function"] == "GoodbyeWorld":
      byeObj = GoodbyeWorldClass()
      bye = multiprocessing.Process(target=byeObj.byeWorld)
      bye.start()
      bye.join()

The problem is, I want to make repeated requests and fill the data["tasks"] array as the other processes are running. If I throw everything into some while loop, it'll only make a request after all the processes from the initial response is done (when join() has been reached for all processes).

Can anyone help me to make repeated requests and fill the array continuously? Please let me know if I need to make any clarifications.

Upvotes: 2

Views: 627

Answers (2)

Bastian Venthur
Bastian Venthur

Reputation: 16590

You have a bug in your program, you should call the joins after you've created all the tasks. Join blocks until the process has finished -- in your case before you start the next one. Which practically makes you whole program run sequentially.

Upvotes: 1

constt
constt

Reputation: 2320

If I understood you correctly, you need something like this:

import time
from multiprocessing import Process

import requests

from task import FunctionFactory


def get_tasks():
    resp = requests.get('https://localhost:5000', auth=('user', 'pass'))
    data = resp.json()

    return data['tasks']


if __name__ == '__main__':
    procs = {}

    for _ in range(10):
        tasks = get_tasks()

        if not tasks:
            time.sleep(5)
            continue

        for task in tasks:

            if task['id'] in procs:
                # This task has been already submitted for execution.
                continue

            func = FunctionFactory.build(task['function'])

            proc = Process(target=func)
            proc.start()

            procs[task['id']] = proc

    # Waiting for all the submitted tasks to finish.
    for proc in procs.values():
        proc.join()

Here, the function get_tasks is used to request a list of dictionaries with id and function keys from the server. In the main section, there is a procs dictionary that maps id to running process instances which execute functions built by a FunctionFactory using received tasks' function names. In the case there is already a running task with the same id, it gets ignored.

With this approach, you can request tasks as often as needed (here, 10 requests are used in a for loop) and start processes to execute them in parallel. In the end, you just wait for all the submitted tasks to finish.

Upvotes: 2

Related Questions