Gareth Davis
Gareth Davis

Reputation: 28059

How to summarise data in Google AppEngine

I'm trying to implement a summary view on to a large(ish) data set using AppEngine.

My model looks something like:

def TxRecord(db.Model):
    expense_type = db.StringProperty()
    amount = db.IntegerProperty()

def ExpenseType(db.Model):
    name = db.StringProperty()
    total = db.IntegerProperty()

My datastore contains 100K instances of TxRecord and I'd like to summarise these by expense_type.

In sql it would be something like:

select expense_type as name, sum(amount) as total 
    from TxRecord
    group by expense_type

What I'm currently doing is using the Python MapReduce framework to iterate over all of the TxRecords using the following mapper:

def generate_expense_type(rec):
    expense_type = type.get_or_insert(name, name = rec.expense_type)
    expense_type.total += rec.amount

    yield op.db.Put(expense_type)

This seems to work, but I feel I have to run it using a shard_count of 1 in order to ensure that the total isn't over written with concurrent writes.

Is there a strategy that I can use to over come this issue using AppEngine or is that it?

Upvotes: 2

Views: 317

Answers (3)

Nick Johnson
Nick Johnson

Reputation: 101149

Using mapreduce is the right approach. Counters, as David suggests, are one option, but they're not reliable (they use memcache), and they're not designed for massive numbers of counters to be kept in parallel.

Your current mapreduce has a couple of issues: First, get_or_insert executes a datastore transaction every time it's called. Second, you then update the amount outside the transaction and asynchronously store it a second time, generating the concurrency issue you were concerned about.

At least until reduce is fully supported, your best option is to do the whole update in the mapper in a transaction, like this:

def generate_expense_type(rec):
    def _tx():
      expense_type = type.get(name)
      if not expense_type:
        expense_type = type(key_name=name)
      expense_type.total += rec.amount
      expense_type.put()
    db.run_in_transaction(expense_type)

Upvotes: 3

Robert Kluin
Robert Kluin

Reputation: 8292

MapReduce is great for offline processing of data, and I like David's solution for handling the counters (+1 upvote).

I just wanted to mention another option: process the data as it comes in. Check out Brett Slatkin's High Throughput Data Pipelines on App Engine talk from IO 2010.

I've implemented the technique in a simple framework (slagg), you might find my example of grouping with date rollup useful.

Upvotes: 3

David Underhill
David Underhill

Reputation: 16243

Using the MapReduce framework is a good idea. You could use more than one shard if you utilize the counters provided by the MapReduce framework. So instead of modifying the datastore each time, you could do something like this:

yield op.counters.Increment("total_<expense_type_name>", rec.amount)

After the MapReduce finishes (hopefully much more quickly than when you were using just one shard), then you can copy the finalized counters into your datastore entity.

Upvotes: 3

Related Questions