Reputation: 171
I am currently working with two Datastore entities, Filing
and Document
. A Filing
can have multiple Documents
:
Currently our users can lookup Filings
but while adding the data I noticed that there was a problem with some of the Document
entities - the KeyProperty
on the Filings
is missing (problems with the parser).
Since my documents have an individual ID that is of the format FilingID_documentNumber
I decided to use appengine-mapreduce to add the missing KeyProperty
pointing to the Filing
so I can get all Documents
for a given Filing
.
So I created the following MapReduce job:
@ndb.toplevel
def map_callback(ed):
gc.collect()
try:
if(ed.filing is None):
ed.filing = ndb.Key("Filing", ed.key.id().split("_")[0])
yield op.db.Put(ed)
yield op.counters.Increment("n_documents_fixed")
except:
yield op.counters.Increment("saving_error")
class FixDocs(base_handler.PipelineBase):
def run(self, *args, **kwargs):
""" run """
mapper_params = {
"input_reader": {
"entity_kind": "datamodels.Document",
"batch_size": 10, #default is 50
}
}
yield mapreduce_pipeline.MapperPipeline(
"Fix Documents",
handler_spec="search.mappers.fix_documents.map_callback",
input_reader_spec="mapreduce.input_readers.DatastoreInputReader",
params=mapper_params,
shards=128)
My problem is that I am currently not able to run this mapper as I am running in a lot of memory errors. In the logs I notice a lot of the shards getting the following error:
Exceeded soft private memory limit of 128 MB with 154 MB after servicing 2 requests total While handling this request, the process that handled this request was found to be using too much memory and was terminated. This is likely to cause a new process to be used for the next request to your application. If you see this message frequently, you may have a memory leak in your application.
I tried:
@ndb.toplevel
on the mapperbatch_size
as the default value is 50 and I thought that maybe the documents could get considerably big (one of their fields is a BlobProperty(compressed=True, indexed=False)
But neither modification seemed to improve the execution of the job.
Does anyone have an idea of how this could be improved? Also I read that Dataflow might be suited for this, does anyone have an experience using Dataflow to update Datastore?
Upvotes: 0
Views: 58
Reputation: 39834
The error message indicates that your app uses either an F1
(default for automatic scaling) or a B1
instance class, which have a 128M memory limit.
One thing you could try would be to configure an instance class with more memory (which also happens to be faster as well) in your app.yaml
file. See also the instance_class
row in the Syntax table.
Side note: when I bumped my instance class higher for the same reason I also noticed that the gc.collect()
calls I had in my code started to be visibly effective. Not entirely sure why, I suspect because of the faster instance and the higher memory limit gave it enough time to kick in before the instance being killed. This should help as well.
Upvotes: 1