galaxyan
galaxyan

Reputation: 6111

python multiprocessing pool assign object to worker

I have some objects need to be processed. I wonder if there is way to assign work (process) to object based on unique key.
The first time when the code see object it should be randomly assigned a worker, but if the object appear again it should be assigned to the worker which processes the object before. Thank you

for example:
worker A,B,C | first bunch objects 1,2,3,4 second bunch objects 1,3
first bunch objects:
worker A <--- 1,3
worker B <--- 2
worker C <--- 4
second bunch objects:
worker A <--- 1,3
worker B <---
worker C <---

Upvotes: 2

Views: 2123

Answers (1)

John Zwinck
John Zwinck

Reputation: 249133

A very simple way to implement "sticky sessions" is to make your own version of multiprocessing.Pool which doesn't eagerly assign work items, but assigns them deterministically. Here's an incomplete but runnable solution:

import multiprocessing
import os
import time

def work(job):
    time.sleep(1)
    print "I am process", os.getpid(), "processing job", job

class StickyPool:
    def __init__(self, processes):
        self._inqueues = [multiprocessing.Queue() for ii in range(processes)]
        self._pool = [multiprocessing.Process(target=self._run, args=(self._inqueues[ii],)) for ii in range(processes)]
        for process in self._pool:
            process.start()

    def map(self, fn, args):
        for arg in args:
            ii = hash(arg) % len(self._inqueues)
            self._inqueues[ii].put((fn, arg))

    def _run(self, queue):
        while True:
            fn, arg = queue.get()
            fn(arg)

pool = StickyPool(3)
#pool = multiprocessing.Pool(3)                                                                                         

pool.map(work, [1,2,3,4,1,2,3,4,1,2,3,4])
time.sleep(4)

When using the above StickyPool, jobs are assigned based on the hash of their arguments. This means the same arguments go to the same process every time. It's not smart enough to evenly distribute jobs if there are many unique values whose hashes collide, but oh well--room for future improvement. I also didn't bother with shutdown logic, so the program doesn't stop running if you use StickyPool but it does if you use multiprocessing.Pool. Fixing those issues and implementing more of the Pool interface (like apply(), and returning results from map()) is left as an exercise.

Upvotes: 3

Related Questions