Reputation: 335
I don't think what I want exists. Can anyone create for me a barebones mini mapper class? Detailed pseudo-code or actual python is fine. Update: Simple, working version at bottom of post.
Update 2 - June 20:
Update 3 - June 21:
I hope this is useful to someone besides myself. I use it frequently and it's a comfortable in-between of MapReduce and when you just need cursors because you're not sure how many results you'll have to deal with.
The mapreduce lib for gae is great, but I want something lightweight and disposable. In the tutorial for python gae, often you would see db models being iterating over, modified and saved. I don't think there are any more examples like this because as we know that is very inefficient and calls the datastore once for every loop instead of batching. I like that interface though and I often find myself needing a simple and fast way to run through my db models.
Usage
Behind the Scenes
This is where I need your help because I feel like I'm in over my head.
Generator (I've never used generators and only sorta understand them) object batch-grabs datastore items (how many is safe to grab? is there a hard limit or does it depend on item size?) and presents them in an iterable fashion. Once MAX_AMOUNT batch_size has been reached, batch save the items to datastore and grab the next batch (cursor'd) seamlessly.
One thing I was considering was using defer to save the items to db, with the intent being to save some time if we are looping over many items. possible downside could be that the next section of code expects the map to have finished. so I think it would be good to have a 'defer_db' flag be set or ignored depending on user preference. If you're only expecting a small amount of items then you wouldn't set the defer flag.
Please contribute to this small project with code concepts. Accepted answer will be the one with the most upvotes after a week. Admittedly, I feel a bit dirty asking SO to come up with a solution for me, but sincerely, I don't feel up to the task. I hope you find it useful.
country_mim = MIM(CountryModels.all()).filter("spoken_language =", "French")
country_mim.order("population")
some_mim = MIM(SomeModel.all())
for x in some_mim.iterall():
if x.foo == 'ham sandwich':
sandwich_mim = MIM(MySandwiches.all())
for sandwich in sandwich_mim.iterall():
if 'ham' in sandwich.ingredients:
print 'yay'
country_mim = MIM(CountryModels.all()).order("drinking_age")
for country in country_mim.iterall():
if country.drinking_age > 21: # these countries should be nuked from orbit
country_mim.bdel(country) # delete
if country.drinking_age == 18:
country.my_thoughts = "god bless you foreigners"
country_mim.bput(country) # save
if country.drinking_age < 10: # panic
country.my_thoughts = "what is this i don't even..."
country_mim.bput(country)
break # even though we panicked, the bput still resolves
I've been using this code for several days weeks now and everything seems fine. Defer has not been included. The query facade code was stolen (with permission) from the great PagedQuery module. Supports batch saving and batch deleting.
import google.appengine.ext.db as db
from google.appengine.ext.deferred import defer
class MIM(object):
"""
All standard Query functions (filter, order, etc) supported*. Default batch
size is 100. defer_db=True will cause put and delete datastore operations to
be deferred. allow_func accepts any function you wish and only the entities
that cause the function to return a true value will be returned during
iterall(). Using break/continue/return while iterating doesn't cause things
to explode (like it did in the 1st version).
* - thanks to http://code.google.com/p/he3-appengine-lib/wiki/PagedQuery
"""
def __init__(self, query, batch_size=100, defer_db=False, allow_func=None):
self._query = query
self._batch_size = batch_size
self._defer_db = defer_db
self._allow_func = allow_func
self._to_save = []
self._to_delete = []
# find out if we are dealing with another facade object
if query.__dict__.has_key('_query'): query_to_check = query._query
else: query_to_check = query
if isinstance(query_to_check, db.Query): self._query_type = 'Query'
elif isinstance(query_to_check, db.GqlQuery): self._query_type = 'GqlQuery'
else: raise TypeError('Query type not supported: ' + type(query).__name__)
def iterall(self):
"Return iterable over all datastore items matching query. Items pulled from db in batches."
results = self._query.fetch(self._batch_size) # init query
savedCursor = self._query.cursor() # init cursor
try:
while results:
for item in results:
if self._allow_func:
if self._allow_func(item):
yield item
else:
yield item
if len(results) == self._batch_size:
results = self._query.with_cursor(savedCursor).fetch(self._batch_size)
savedCursor = self._query.cursor()
else: # avoid additional db call if we don't have max amount
results = [] # while loop will end, and go to else section.
else:
self._finish()
except GeneratorExit:
self._finish()
def bput(self, item):
"Batch save."
self._to_save.append(item)
if len(self._to_save) >= self._batch_size:
self._bput_go()
def bdel(self, item):
"Batch delete."
self._to_delete.append(item)
if len(self._to_delete) >= self._batch_size:
self._bdel_go()
def _bput_go(self):
if self._defer_db:
defer(db.put, self._to_save)
else: db.put(self._to_save)
self._to_save = []
def _bdel_go(self):
if self._defer_db:
defer(db.delete, self._to_delete)
else: db.delete(self._to_delete)
self._to_delete = []
def _finish(self):
"When done iterating through models, could be that the last few remaining weren't put/deleted yet."
if self._to_save: self._bput_go()
if self._to_delete: self._bdel_go()
# FACADE SECTION >>>
def fetch(self, limit, offset=0):
return self._query.fetch(limit,offset)
def filter(self, property_operator, value):
self._check_query_type_is('Query')
self._query = self._query.filter(property_operator, value)
return self
def order(self, property):
self._check_query_type_is('Query')
self._query.order(property)
return self
def ancestor(self, ancestor):
self._check_query_type_is('Query')
self._query.ancestor(ancestor)
return self
def count(self, limit=1000):
return self._query.count(limit)
def _check_query_type_is(self, required_query_type):
if self._query_type != required_query_type:
raise TypeError('Operation not allowed for query type ('\
+ type(self._query).__name__)
Upvotes: 3
Views: 561
Reputation: 101149
Why don't you want to use Mapreduce? It's designed for exactly this use-case, already does everything you want, and can be invoked programmatically. 'Lightweight' is a very vague term, but I'm not aware of any reason that the mapreduce library doesn't suit your task exactly - and there's very little reason to duplicate that functionality.
Upvotes: 1