balu
balu

Reputation: 121

Dynamic argument for multiprocessing

I have the following issue: I have a dict with a few hundred keys (still about 150 MB), each with a complex value containing dicts, lists, single values. I have 3 streams of incoming information with 1s, 0.1s, and real time timings, depending on the data type. To accelerate data processing, I want to use multiprocessing to create 3 processes for the different sources, preferably each process having its own pool to accelerate further.

The issue is how to "chop" the general dict into updateable pieces. It seems to me that using a pool or a process I have to decide the list of arguments at the beginning when I initialize the process/pool. My task would need something like this: I get a message that "A" key needs to be updated. I assign a worker to update it, passing the message containing the new info and the complex object of "A" (or at least the relevant value of "A"). I definitely don't want to pass the whole dict to every worker because it uses a lot of memory.

In this sample code I would only want to pass general_dict['A']['a'] when the first element of example_data_a is processed, general_dict['B']['a'] for the third and so on. The same way for example_data_b. How should I pass the arguments?

general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                'B': {'a': [3, 4, 5], 'b': 'test2'},
                'C': {'a': [6, 7, 8], 'b': 'test3'}},

example_data_a = ['A', [2,1,2],
                  'A', [2,3,2],
                  'B', [3,0,5],
                  'C', [6,1,8]]

example_data_b = ['A', 'test11',
                  'B', 'test21',
                  'B', 'test22',
                  'C', 'test31']

def update_a(x):
    ...

def update_b(y):
    ...

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = update_a)
    p2 = multiprocessing.Process(target = update_b)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Upvotes: 0

Views: 722

Answers (1)

Booboo
Booboo

Reputation: 44128

I get your idea. But the problem is that all possible keys could come through all three streams, so that doesn't sound like it would be the most workable approach. It seems to me you should have one process processing the input stream. Moreover, there should be no need to split the dictionary. Instead you have three processes that handled a third of the keys as you envisioned. Each process is started at the beginning and is passed their own multiprocessing.Queue instance as an input queue and they are all passed a common results queue for passing back the return value. A thread started by the main process continuously does gets on the results queue and updates the dictionary with the returned values.

This is the general idea:

from multiprocessing import Process, Queue
from threading import Thread


def update_a(input_queue, result_queue):
    while True:
        # Wait for next request:
        x = input_queue.get()
        if x is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)

def update_b(input_queue, result_queue):
    while True:
        # Wait for next request:
        y = input_queue.get()
        if y is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)

def update_c(input_queue, result_queue):
    while True:
        # Wait for next request:
        z = input_queue.get()
        if x is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)


def process_results():
    sentinels_seen = 0
    # Have all 3 processes finished?
    while sentinels_seen < 3:
        # Get next result
        result = result_queue.get()
        if result is None:
            # Sentinel
            sentinels_seen += 1
        else:
            # Update general_dict with result:
            ...

def process_input_stream():
    while True:
        # When we have decided that we are through processing input
        # break out of the loop:
        if through_processing:
            break
        # Get input from one of 3 sources and depending on key
        # put the "argument" to either a_q, b_q or c_q to be handled respectively
        # by either update_a, update_b or update_c.
        # The result will be put to result queue which will be processed by our
        # process_results thread.
        ...

    # Add a sentinel to each of the input queues:
    a_q.put(None)
    b_q.put(None)
    c_q.put(None)

if __name__ == "__main__":
    # Building the general_dict should be protected by if __name__ == "__main__":
    general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                    'B': {'a': [3, 4, 5], 'b': 'test2'},
                    'C': {'a': [6, 7, 8], 'b': 'test3'}}
    a_q, b_q, c_q =  Queue(), Queue(), Queue()
    result_queue = Queue()
    p1 = Process(target=update_a, args=(a_q, result_queue))
    p2 = Process(target=update_b, args=(b_q, result_queue))
    p3 = Process(target=update_c, args=(c_q, result_queue))
    t = Thread(target=process_results)
    p1.start()
    p2.start()
    p3.start()

    process_input_stream()

    p1.join()
    p2.join()
    p3.join()
    t.join()

Note:

If you find there is too much contention between the process_results thread and the process_input_stream loop because of the GIL that prevents the latter from keeping up with the input stream, then do not start and join process_results a thread. Instead, just start and join the three processes as before and then finally call process_results as a function by the main process. You will, of course, loose any concurrency that way:

if __name__ == "__main__":
    # Building the general_dict should be protected by if __name__ == "__main__":
    general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                    'B': {'a': [3, 4, 5], 'b': 'test2'},
                    'C': {'a': [6, 7, 8], 'b': 'test3'}}
    a_q, b_q, c_q =  Queue(), Queue(), Queue()
    result_queue = Queue()
    p1 = Process(target=update_a, args=(a_q, result_queue))
    p2 = Process(target=update_b, args=(b_q, result_queue))
    p3 = Process(target=update_c, args=(c_q, result_queue))
    p1.start()
    p2.start()
    p3.start()

    process_input_stream()

    p1.join()
    p2.join()
    p3.join()

    process_results()

Upvotes: 1

Related Questions