Reputation: 2232
I need to run a function for the each of the elements of my database.
When I try the following:
from multiprocessing import Pool
from pymongo import Connection
def foo():
...
connection1 = Connection('127.0.0.1', 27017)
db1 = connection1.data
my_pool = Pool(6)
my_pool.map(foo, db1.index.find())
I'm getting the following error:
Job 1, 'python myscript.py ' terminated by signal SIGKILL (Forced quit)
Which is, I think, caused by db1.index.find()
eating all the available ram while trying to return millions of database elements...
How should I modify my code for it to work?
Some logs are here:
dmesg | tail -500 | grep memory
[177886.768927] Out of memory: Kill process 3063 (python) score 683 or sacrifice child
[177891.001379] [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
[177891.021362] Out of memory: Kill process 3063 (python) score 684 or sacrifice child
[177891.025399] [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
The actual function below:
def create_barrel(item):
connection = Connection('127.0.0.1', 27017)
db = connection.data
print db.index.count()
barrel = []
fls = []
if 'name' in item.keys():
barrel.append(WhitespaceTokenizer().tokenize(item['name']))
name = item['name']
elif 'name.utf-8' in item.keys():
barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8']))
name = item['name.utf-8']
else:
print item.keys()
if 'files' in item.keys():
for file in item['files']:
if 'path' in file.keys():
barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path'])))
fls.append(("\\".join(file['path']),file['length']))
elif 'path.utf-8' in file.keys():
barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8'])))
fls.append(("\\".join(file['path.utf-8']),file['length']))
else:
print file
barrel.append(WhitespaceTokenizer().tokenize(file))
if len(fls) < 1:
fls.append((name,item['length']))
barrel = sum(barrel,[])
for s in barrel:
vs = re.findall("\d[\d|\.]*\d", s) #versions i.e. numbes such as 4.2.7500
b0 = []
for s in barrel:
b0.append(re.split("[" + string.punctuation + "]", s))
b1 = filter(lambda x: x not in string.punctuation, sum(b0,[]))
flag = True
while flag:
bb = []
flag = False
for bt in b1:
if bt[0] in string.punctuation:
bb.append(bt[1:])
flag = True
elif bt[-1] in string.punctuation:
bb.append(bt[:-1])
flag = True
else:
bb.append(bt)
b1 = bb
b2 = b1 + barrel + vs
b3 = list(set(b2))
b4 = map(lambda x: x.lower(), b3)
b_final = {}
b_final['_id'] = item['_id']
b_final['tags'] = b4
b_final['name'] = name
b_final['files'] = fls
print db.barrels.insert(b_final)
I've noticed interesting thing. Then I press ctrl+c to stop process I'm getting the following:
python index2barrel.py
Traceback (most recent call last):
File "index2barrel.py", line 83, in <module>
my_pool.map(create_barrel, db1.index.find, 6)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async
iterable = list(iterable)
TypeError: 'instancemethod' object is not iterable
I mean, why multiprocessing is trying to convert somethin to the list? Isn't it the source of the problem?
from the stack trace:
brk(0x231ccf000) = 0x231ccf000
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1
sendto(3, "+\0\0\0\260\263\355\356\0\0\0\0\325\7\0\0\0\0\0\0data.index\0\0"..., 43, 0, NULL, 0) = 43
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663
... [manymany times]
brk(0x2320d5000) = 0x2320d5000
.... manymany times
The above sample goes and goes in strace output and for some reason strace -o logfile python myscript.py does not halt. It just eats all the available ram and writes in log file.
UPDATE. Using imap instead of map solved my problem.
Upvotes: 3
Views: 1627
Reputation: 1468
Since the find()
operation is returning the cursor the the map function and since you say that this runs without a problem when you do
for item in db1.index.find(): create_barrel(item)
it looks like the create_barrel
function is OK.
Can you try to limit the number of results returned in the cursor and see if this helps? I think the syntax would be:
db1.index.find().limit(100)
If you could try this and see if it helps it might help to get the cause of the problem.
EDIT1: I think you are going about this the wrong way by using the map function - I think you should be using map_reduce in the mongo python driver - that way the map function will be executed by the mongod process.
Upvotes: 2
Reputation: 10177
map() function gives the items in chunks to the given function. By default this chunksize is calculated like this (link to source):
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
This probably results in too big chunk size in your case and lets the process run out of memory. Try setting the chunk size manually like this:
my_pool.map(foo, db1.index.find(), 100)
EDIT: You should also consider reusing the db connection and closing them after usage. Now you create new db connection for each item, and you don't call close()
to them.
EDIT2: Also check if the while
loop gets into an infinite loop (would explain the symptoms).
EDIT3: Based on the traceback you added the map function tries to convert the cursor to a list, causing all the items to be fetched at once. This happens because it want's to find how many items there are in the set. This is part of map()
code from pool.py:
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
You could try this to avoid conversion to list:
cursor = db1.index.find()
cursor.__len__ = cursor.count()
my_pool.map(foo, cursor)
Upvotes: 1