Reputation: 13
I have the following program :
import string
import itertools
import multiprocessing as mp
def test(word_list):
return list(map(lambda xy: (xy[0], len(list(xy[1]))),
itertools.groupby(sorted(word_list))))
def f(x):
return (x[0], len(list(x[1])))
def test_parallel(word_list):
w = mp.cpu_count()
pool = mp.Pool(w)
return (pool.map(f, itertools.groupby(sorted(word_list))))
def main():
test_list = ["test", "test", "test", "this", "this", "that"]
print(test(test_list))
print(test_parallel(test_list))
return
if __name__ == "__main__":
main()
The output is :
[('test', 3), ('that', 1), ('this', 2)]
[('test', 0), ('that', 0), ('this', 1)]
The first line is the expected and correct result. My question is, why isn't pool.map() returning the same results as map()?
Also, I'm aware a 6 item list isn't the perfect case for multiprocessing. This is simply a demonstration of the issue I am having while implementing in a larger application.
I'm using Python 3.5.1.
Upvotes: 1
Views: 493
Reputation: 1122492
groupby()
returns iterators per group, and these are not independent from the underlying iterator passed in. You can't independently iterate over these groups in parallel; any preceding group will be prematurely ended the moment you access the next.
pool.map()
will try to read all of the groupby()
iterator results to send those results to separate functions; merely trying to get a second group will cause the first to be empty.
You can see the same result without pool.map()
simply by iterating to the next result from groupby()
:
>>> from itertools import groupby
>>> word_list = ["test", "test", "test", "this", "this", "that"]
>>> iterator = groupby(sorted(word_list))
>>> first = next(iterator)
>>> next(first[1])
'test'
>>> second = next(iterator)
>>> list(first[1])
[]
The remainder of the first group is 'empty' because the second group has been requested.
This is clearly documented:
Because the source is shared, when the
groupby()
object is advanced, the previous group is no longer visible.
You'd have to 'materialise' each group before sending it to the the function:
return pool.map(lambda kg: f((k[0], list(kg[1]))), itertools.groupby(sorted(word_list)))
or
return pool.map(f, (
(key, list(group)) for key, group in itertools.groupby(sorted(word_list))))
where the generator expression takes care of the materialising as pool.map()
iterates.
Upvotes: 2
Reputation: 60143
From https://docs.python.org/3.5/library/itertools.html#itertools.groupby:
The returned group is itself an iterator that shares the underlying iterable with groupby(). Because the source is shared, when the groupby() object is advanced, the previous group is no longer visible. So, if that data is needed later, it should be stored as a list:
groups = [] uniquekeys = [] data = sorted(data, key=keyfunc) for k, g in groupby(data, keyfunc): groups.append(list(g)) # Store group iterator as a list uniquekeys.append(k)
I think the issue here is that Pool.map
tries to chop up its input, and in doing so, it iterates through the result of groupby
, which effectively skips over the elements from all but the last group.
One fix for your code would be to use something like [(k, list(v)) for k, v in itertools.groupby(sorted(word_list))]
, but I don't know how applicable that is to your real-world use case.
Upvotes: 3