Milo
Milo

Reputation: 335

Mini Iterable Model Mapper for google app engine python

I don't think what I want exists. Can anyone create for me a barebones mini mapper class? Detailed pseudo-code or actual python is fine. Update: Simple, working version at bottom of post.

Update 2 - June 20:

Update 3 - June 21:

I hope this is useful to someone besides myself. I use it frequently and it's a comfortable in-between of MapReduce and when you just need cursors because you're not sure how many results you'll have to deal with.


What is this about?

The mapreduce lib for gae is great, but I want something lightweight and disposable. In the tutorial for python gae, often you would see db models being iterating over, modified and saved. I don't think there are any more examples like this because as we know that is very inefficient and calls the datastore once for every loop instead of batching. I like that interface though and I often find myself needing a simple and fast way to run through my db models.

What would it look like?

Usage

  1. Import the class.
  2. Tell it what model you want to map over
  3. give it optional query filters
  4. get iterator object
  5. loop away, safe in the knowledge that you aren't making thousands of unnecessary db calls.

Behind the Scenes

This is where I need your help because I feel like I'm in over my head.

Generator (I've never used generators and only sorta understand them) object batch-grabs datastore items (how many is safe to grab? is there a hard limit or does it depend on item size?) and presents them in an iterable fashion. Once MAX_AMOUNT batch_size has been reached, batch save the items to datastore and grab the next batch (cursor'd) seamlessly.

One thing I was considering was using defer to save the items to db, with the intent being to save some time if we are looping over many items. possible downside could be that the next section of code expects the map to have finished. so I think it would be good to have a 'defer_db' flag be set or ignored depending on user preference. If you're only expecting a small amount of items then you wouldn't set the defer flag.

Conclusion

Please contribute to this small project with code concepts. Accepted answer will be the one with the most upvotes after a week. Admittedly, I feel a bit dirty asking SO to come up with a solution for me, but sincerely, I don't feel up to the task. I hope you find it useful.

Examples

Same Query Functions

country_mim = MIM(CountryModels.all()).filter("spoken_language =", "French")
country_mim.order("population")

Nested Iteration

some_mim = MIM(SomeModel.all())
for x in some_mim.iterall():
    if x.foo == 'ham sandwich':
        sandwich_mim = MIM(MySandwiches.all())
        for sandwich in sandwich_mim.iterall():
            if 'ham' in sandwich.ingredients:
                print 'yay'

Batch Saving & Deleting

country_mim = MIM(CountryModels.all()).order("drinking_age")
for country in country_mim.iterall():
    if country.drinking_age > 21:   # these countries should be nuked from orbit
        country_mim.bdel(country)   # delete
    if country.drinking_age == 18:
        country.my_thoughts = "god bless you foreigners"
        country_mim.bput(country)   # save
    if country.drinking_age < 10:   # panic
        country.my_thoughts = "what is this i don't even..."
        country_mim.bput(country)
        break   # even though we panicked, the bput still resolves

Some Code: MiniIterMapper.py

I've been using this code for several days weeks now and everything seems fine. Defer has not been included. The query facade code was stolen (with permission) from the great PagedQuery module. Supports batch saving and batch deleting.

import google.appengine.ext.db as db
from google.appengine.ext.deferred import defer

class MIM(object):
    """
    All standard Query functions (filter, order, etc) supported*. Default batch
    size is 100. defer_db=True will cause put and delete datastore operations to
    be deferred. allow_func accepts any function you wish and only the entities
    that cause the function to return a true value will be returned during
    iterall(). Using break/continue/return while iterating doesn't cause things
    to explode (like it did in the 1st version).

    * - thanks to http://code.google.com/p/he3-appengine-lib/wiki/PagedQuery
    """

    def __init__(self, query, batch_size=100, defer_db=False, allow_func=None):

        self._query =       query
        self._batch_size =  batch_size
        self._defer_db =    defer_db
        self._allow_func =  allow_func
        self._to_save =     []
        self._to_delete =   []

        # find out if we are dealing with another facade object
        if query.__dict__.has_key('_query'): query_to_check = query._query
        else: query_to_check  = query

        if isinstance(query_to_check, db.Query):        self._query_type = 'Query'
        elif isinstance(query_to_check, db.GqlQuery):   self._query_type = 'GqlQuery'
        else: raise TypeError('Query type not supported: ' + type(query).__name__)

    def iterall(self):
        "Return iterable over all datastore items matching query. Items pulled from db in batches."

        results =               self._query.fetch(self._batch_size) # init query
        savedCursor =           self._query.cursor()                # init cursor

        try:
            while results:

                for item in results:
                    if self._allow_func:
                        if self._allow_func(item):
                            yield item
                    else:
                        yield item

                if len(results) ==  self._batch_size:
                    results =       self._query.with_cursor(savedCursor).fetch(self._batch_size)
                    savedCursor =   self._query.cursor()

                else:                   # avoid additional db call if we don't have max amount
                    results =       []  # while loop will end, and go to else section.
            else:
                self._finish()
        except GeneratorExit:
            self._finish()

    def bput(self, item):
        "Batch save."
        self._to_save.append(item)
        if len(self._to_save) >= self._batch_size:
            self._bput_go()

    def bdel(self, item):
        "Batch delete."
        self._to_delete.append(item)
        if len(self._to_delete) >= self._batch_size:
            self._bdel_go()

    def _bput_go(self):
        if self._defer_db:
            defer(db.put, self._to_save)
        else: db.put(self._to_save)
        self._to_save = []

    def _bdel_go(self):
        if self._defer_db:
            defer(db.delete, self._to_delete)
        else: db.delete(self._to_delete)
        self._to_delete = []

    def _finish(self):
        "When done iterating through models, could be that the last few remaining weren't put/deleted yet."
        if self._to_save:   self._bput_go()
        if self._to_delete: self._bdel_go()

    # FACADE SECTION >>>

    def fetch(self, limit, offset=0):
        return self._query.fetch(limit,offset)

    def filter(self, property_operator, value):
        self._check_query_type_is('Query')
        self._query = self._query.filter(property_operator, value)
        return self

    def order(self, property):
        self._check_query_type_is('Query')
        self._query.order(property)
        return self

    def ancestor(self, ancestor):
        self._check_query_type_is('Query')
        self._query.ancestor(ancestor)
        return self

    def count(self, limit=1000):
        return self._query.count(limit)

    def _check_query_type_is(self, required_query_type):
        if self._query_type != required_query_type:
            raise TypeError('Operation not allowed for query type ('\
                            + type(self._query).__name__)

Upvotes: 3

Views: 561

Answers (1)

Nick Johnson
Nick Johnson

Reputation: 101149

Why don't you want to use Mapreduce? It's designed for exactly this use-case, already does everything you want, and can be invoked programmatically. 'Lightweight' is a very vague term, but I'm not aware of any reason that the mapreduce library doesn't suit your task exactly - and there's very little reason to duplicate that functionality.

Upvotes: 1

Related Questions