Reputation: 927
It looks like this question has been asked many times, and I'm aware that the datastore has a storage limit of 1MB per entity, as explained here.
But. I'm still stuck...
Here the code I use to defer tasks (largely taken from the Google's sample Mapper code here):
class Mapper(object):
# Subclasses should replace this with a model class (eg, model.Person).
KIND = None
# Subclasses can replace this with a list of (property, value) tuples to filter by.
FILTERS = []
def __init__(self):
self.to_put = []
self.to_delete = []
def map(self, entity):
"""Updates a single entity.
Implementers should return a tuple containing two iterables (to_update, to_delete).
"""
return ([], [])
def finish(self):
"""Called when the mapper has finished, to allow for any final work to be done."""
pass
def get_query(self):
"""Returns a query over the specified kind, with any appropriate filters applied."""
q = self.KIND.query()
for prop, value in self.FILTERS:
q = q.filter(prop == value)
return q
def run(self, act_urlkey=None, batch_size=20):
"""Starts the mapper running."""
self._continue(None, batch_size)
def _batch_write(self):
"""Writes updates and deletes entities in a batch."""
if self.to_put:
ndb.put_multi(self.to_put)
self.to_put = []
if self.to_delete:
ndb.delete_multi(self.to_delete)
self.to_delete = []
def _continue(self, curs_str=None, batch_size=20):
logging.debug("entering _continue with curs_str: %s" % pprint.pformat(curs_str))
q = self.get_query()
# If we're resuming, pick up where we left off last time.
if curs_str is not None:
curs = Cursor.from_websafe_string(curs_str)
entities, next_curs, more = q.fetch_page(batch_size, start_cursor=curs)
else:
entities, next_curs, more = q.fetch_page(batch_size)
try:
# Steps over the results, returning each entity and its index.
for entity in entities:
self.map(entity)
if next_curs and more:
logging.debug("Mapper._continue - sys.getsizeof(self): %d, sys.getsizeof(next_curs.to_websafe_string()): %d, sys.getsizeof(batch_size): %d" % (
sys.getsizeof(self),
sys.getsizeof(next_curs.to_websafe_string()),
sys.getsizeof(batch_size)
))
deferred.defer(self._continue, next_curs.to_websafe_string(), batch_size)
else:
self.finish()
except:
exc_type, exc_value, exc_tb = sys.exc_info()
logging.exception(traceback.format_exception(exc_type, exc_value, exc_tb))
The code runs well, iterates through query pages a fairly large amount of time, but inevitably fails with this exception after some time:
Traceback (most recent call last):
File "/base/data/home/apps/s~xxx-test/backendadmin:beta-0-11-9.388453478982695515/bp_content/themes/xxx/handlers/mappers.py", line 90, in _continue
deferred.defer(self._continue, next_curs.to_websafe_string(), batch_size)
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/ext/deferred/deferred.py", line 272, in defer
key = _DeferredTaskEntity(data=pickled).put()
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 1077, in put
return datastore.Put(self._entity, **kwargs)
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/datastore.py", line 605, in Put
return PutAsync(entities, **kwargs).get_result()
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 613, in get_result
return self.__get_result_hook(self)
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1881, in __put_hook
self.check_rpc_success(rpc)
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1371, in check_rpc_success
rpc.check_success()
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 579, in check_success
self.__rpc.CheckSuccess()
File "/base/data/home/runtimes/python27/python27_lib/versions/1/google/appengine/api/apiproxy_rpc.py", line 134, in CheckSuccess
raise self.exception
RequestTooLargeError: The request to API call datastore_v3.Put() was too large.
The logs showing the size of the arguments passed to deferred.defer
until the error comes up are like this (size is small and never increases):
Mapper._continue - sys.getsizeof(self): 32, sys.getsizeof(next_curs.to_websafe_string()): 85, sys.getsizeof(batch_size): 12
Mapper._continue - sys.getsizeof(self): 32, sys.getsizeof(next_curs.to_websafe_string()): 85, sys.getsizeof(batch_size): 12
Where/How do I find the object that the datastore is trying to store and is too large?
Upvotes: 0
Views: 145
Reputation: 927
ok, silly me, it turns out that I was storing a list as instance variable of my child Mapper
class, which kept growing as the recursion went deeper and deeper. I removed it, and things run smoothly now.
sys.getsizeof
misled me in thinking that my object wasn't growing in size. See this for an explanation of what's wrong with getsizeof and why the object size didn't appear to move, even though it really was.
Upvotes: 1