Reputation: 917
I'm curious to know what the best practices are for migrating entity schemas in Google App Engine. We use pipeline a lot and my inclination was to build a pipeline task to handle this migration. So this is what I came up with (in this example, I store if the user's age is a prime number):
class MigrateUsers(pipeline.Pipeline):
def run(self, keys):
futures = []
users = ndb.get_multi(keys)
for user in users:
user.age_is_prime = is_prime(user.age)
futures.append(user.put_async())
ndb.Future.wait_all(futures)
class Migration(pipeline.Pipeline):
def run(self):
all_results = []
q = ds.User.query().filter()
more = True
next_cursor = None
# Fetch user keys in batch and create MigrateUsers jobs
while more:
user_keys, next_cursor, more = \
q.fetch_page(500, keys_only=True, start_cursor=next_cursor)
all_results.append((yield MigrateUsers(keys=user_keys)))
# Wait for them all to finish
pipeline.After(*all_results)
My question really is, am I doing this right? It feels a little kludgy that my "Migration" tasks iterates over all the users in order to create segmented tasks. I did take a look a mapreduce, but I didn't get the feeling it was appropriate. I'd appreciate any advice, and if you're using mapreduce and wouldn't mind transforming my example, I'd really appreciate it.
Upvotes: 4
Views: 381
Reputation: 3192
I would strongly recommend looking into app engine TaskQueues for schema migrations. It's a lot easier to setup and operate than backends or MapReduce, IMO. You can find some info here: blog entry.
Upvotes: 0
Reputation: 15143
MapReduce is great for migrations. In my own experience, a migration usually means I need to go over all my entities, update them, and then write them back to the datastore. In this case, I only really need the "map" part, and I don't need the "reduce" part of mapreduce.
The benefit of using mapreduce is that it'll automatically batch your entities over different instances in parallel, so your operation will complete much faster than running serially in your pipeline example. The MR SDK has a DatastoreInputReader() that will fetch every entity of a given kind, and call a map function on each, you just have to provide that map function:
from mapreduce import operation as op
def prime_age_map(user_entity):
user_entity.age_is_prime = is_prime(user.age)
if user_entity.age_is_prime:
yield op.db.Put(user_entity)
There us some boilerplate code I'm not including because I haven't switched up to the latest SDK and what I have would probably be incorrect, but it should be pretty simple because you're only using half the pipeline.
I'm not sure how realistic your example is, but if it's real and you have a many entities, it would be much better to precalculate the prime values (http://primes.utm.edu/lists/small/1000.txt - only the top 30 or so are reasonable age values), and execute specific queries on those age values and update those entities, instead of iterating over the entire Kind. You can do this using the MapReduce pipeline, but you'll have to modify the given DatastoreInputReader to issue a more specific query than fetching your entire Kind.
Upvotes: 3