Nick Heiner
Nick Heiner

Reputation: 122412

Google App Engine: How to use the task queue for this processing?

I'm using the Python GAE SDK.

I have some processing that needs to be done on 6000+ instances of MyKind. It is too slow to be done in a single request, so I'm using the task queue. If I make a single task process only one entity, then it should take only a few seconds.

The documentation says that only 100 tasks can be added in a "batch". (What do they mean by that? In one request? In one task?)

So, assuming that "batch" means "request", I'm trying to figure out what the best way is to create a task for each entity in the datastore. What do you think?

It's easier if I can assume that the order of MyKind will never change. (The processing will never actually change the MyKind instances - it only creates new instances of other types.) I could just make a bunch of tasks, giving each one an offset of where to start, spaced less than 100 apart. Then, each task could create individual tasks that do the actual processing.

But what if there are so many entities that the original request can't add all the necessary scheduling tasks? This makes me think I need a recursive solution - each task looks at the range it is given. If only one element is present in the range, it does processing on it. Otherwise, it subdivides the range further into subsequent tasks.

If I can't count on using offsets and limits to identify entities (because their ordering isn't ensured to be constant), maybe I could just use their keys? But then I could be sending 1000s of keys around, which seems unwieldy.

Am I going down the right path here, or is there another design I should consider?

Upvotes: 3

Views: 2013

Answers (2)

Ken
Ken

Reputation: 1110

Also depending on your design, you can do what I did, which is number all of the records that need to be processed. I process about 3500 items, each taking 3 seconds or so to process. To avoid overlap, timeouts and to account for expansion in the future, my first task gets the list of all the unique items of that kind from the database. Then it divides it up into lists of 500 each item identifier, looping until it accounts for all the unique items in my database and posts each chunk of 500 identifiers to the second tier of handler tasks. Each of the second handler tasks , which currently is seven or eight different tasks, then has a unique list of 500 items, and each of those handler tasks add 500 tasks, one for each unique identifier.

Since it's all managed through loops and counting based on the number of unique items in my database then I can add as many unique items as I want, and the number of tasks will expand to accommodate them with absolutely no duplication. I use it for tracking prices in a game on a daily basis, so it's all fired off using a cron job and requires no intervention on my part at all.

Upvotes: 0

Drew Sears
Drew Sears

Reputation: 12838

When you run code like taskqueue.add(url='/worker', params={'cursor': cursor}) you are enqueueing a task; scheduling a request to execute out of band using the parameters you provide. You can apparently schedule up to 100 of these in one operation.

I don't think you want to, though. Task chaining would make this a lot simpler:

Your worker task would do something like this:

  • Run a query to fetch some records for processing. If a cursor was provided in the task params, use it. Limit the query to 10 records, or whatever you think can finish in 30 seconds.

  • Process your 10 records

  • If your query returned 10 records, enqueue another task and pass it the updated cursor from your query so it can pick up where you left off.

  • If you got fewer than 10 records, you're done. Hooray! Fire off an email or something and quit.

With this route, you only need to kick off the first task, and the rest add themselves.

Note that if a task fails, App Engine will retry it until it succeeds, so you don't need to worry about a datastore hiccup causing one task to timeout and break the chain.

Edit:

The steps above do not guarantee that an entity will be processed only once. Tasks should generally run only once, but Google does advise you to design for idempotence. If it's a major concern, here's one way to handle it:

  • Put a status flag on each entity to be processed, or create a complementary entity to hold the flag. It should have states akin to Pending, Processing, and Processed.

  • When you fetch a new entity to process, transactionally lock and increment the processing flag. Only run the entity if it's Pending. When processing finishes, increment the flag again.

Note that it's not strictly necessary to add the processing flag to every entity before you start. Your "pending" state can just mean the property or corresponding entity doesn't exist yet.

Upvotes: 9

Related Questions