Reputation: 21
I'm making a test harness. The tests are going to operate on large data sets (in the hundreds of gigabytes), and they're going to run in separate processes. I'd prefer to avoid copying the data on disk for each process to use, so I'm reading it once in the harness and pushing it to the tests through queues.
That was the plan, but when the tests don't all operate on the same data in the same format, they end up receiving each other's data. I thought I was inadvertently using the same queue for them all, but it really doesn't look like it.
Tested on Python 3.6 on Redhat 6.7 and Windows 7.
from multiprocessing import Manager
from threading import Thread
from concurrent.futures import ProcessPoolExecutor
class QueueSplitter(object):
def __init__(self, queues=[]):
self.queues = queues
def append(self, q):
self.queues.append(q)
def put(self, obj):
for q in self.queues:
q.put(obj)
def close(self):
self.queues = []
self.done = True
class IterQueueSplitter(QueueSplitter):
def __init__(self, it, sentinel=None, queues=[]):
self.it = it
self.sentinel = sentinel
self.queues = queues
def send(self):
try:
self.put(next(self.it))
except StopIteration:
self.put(self.sentinel)
self.close()
def serve(server):
while not hasattr(server, 'done'):
server.send()
def consume(me, q):
for v in iter(q.get, None):
print('consumer %d: %d' % (me, v))
def repeatabunch(n):
for i in range(100):
yield n
if __name__ == '__main__':
with Manager() as man, ProcessPoolExecutor(4) as ex:
consumers = []
producers = []
servers = []
for i in range(8):
queue = man.Queue()
consumer = ex.submit(consume, i, queue)
consumers.append(consumer)
server = IterQueueSplitter(repeatabunch(i))
server.append(queue)
servers.append(server)
producers.append(Thread(target=serve, args=[server]))
for t in producers:
t.start()
for t in producers:
t.join()
for consumer in consumers:
consumer.result()
How do I get objects to stay in their queue?
Upvotes: 0
Views: 25
Reputation: 21
Turns out, I forgot how default arguments work. The queues=[]
wasn't creating a new list every time the method was called, it was creating a list upon declaration and reusing it every time, meaning that everything was going to every queue.
The correct class definitions look like this:
class QueueSplitter(object):
def __init__(self, queues=None):
self.queues = queues or []
def append(self, q):
self.queues.append(q)
def put(self, obj):
for q in self.queues:
q.put(obj)
def close(self):
self.queues = []
self.done = True
class IterQueueSplitter(QueueSplitter):
def __init__(self, it, sentinel=None, queues=None):
self.it = it
self.sentinel = sentinel
self.queues = queues or []
def send(self):
try:
self.put(next(self.it))
except StopIteration:
self.put(self.sentinel)
self.close()
Upvotes: 1