Reputation: 28059
I'm trying to implement a summary view on to a large(ish) data set using AppEngine.
My model looks something like:
def TxRecord(db.Model):
expense_type = db.StringProperty()
amount = db.IntegerProperty()
def ExpenseType(db.Model):
name = db.StringProperty()
total = db.IntegerProperty()
My datastore contains 100K instances of TxRecord
and I'd like to summarise these by expense_type
.
In sql it would be something like:
select expense_type as name, sum(amount) as total
from TxRecord
group by expense_type
What I'm currently doing is using the Python MapReduce framework to iterate over all of the TxRecords
using the following mapper:
def generate_expense_type(rec):
expense_type = type.get_or_insert(name, name = rec.expense_type)
expense_type.total += rec.amount
yield op.db.Put(expense_type)
This seems to work, but I feel I have to run it using a shard_count
of 1 in order to ensure that the total isn't over written with concurrent writes.
Is there a strategy that I can use to over come this issue using AppEngine or is that it?
Upvotes: 2
Views: 317
Reputation: 101149
Using mapreduce is the right approach. Counters, as David suggests, are one option, but they're not reliable (they use memcache), and they're not designed for massive numbers of counters to be kept in parallel.
Your current mapreduce has a couple of issues: First, get_or_insert
executes a datastore transaction every time it's called. Second, you then update the amount outside the transaction and asynchronously store it a second time, generating the concurrency issue you were concerned about.
At least until reduce is fully supported, your best option is to do the whole update in the mapper in a transaction, like this:
def generate_expense_type(rec):
def _tx():
expense_type = type.get(name)
if not expense_type:
expense_type = type(key_name=name)
expense_type.total += rec.amount
expense_type.put()
db.run_in_transaction(expense_type)
Upvotes: 3
Reputation: 8292
MapReduce is great for offline processing of data, and I like David's solution for handling the counters (+1 upvote).
I just wanted to mention another option: process the data as it comes in. Check out Brett Slatkin's High Throughput Data Pipelines on App Engine talk from IO 2010.
I've implemented the technique in a simple framework (slagg), you might find my example of grouping with date rollup useful.
Upvotes: 3
Reputation: 16243
Using the MapReduce framework is a good idea. You could use more than one shard if you utilize the counters provided by the MapReduce framework. So instead of modifying the datastore each time, you could do something like this:
yield op.counters.Increment("total_<expense_type_name>", rec.amount)
After the MapReduce finishes (hopefully much more quickly than when you were using just one shard), then you can copy the finalized counters into your datastore entity.
Upvotes: 3