Matt Hamilton
Matt Hamilton

Reputation: 815

Avoiding duplicate tasks (or dealing with them) in task queue in Google App Engine

I have a service I've developed running on GAE. The application needs to 'tick' every 3 seconds to perform a bunch of calculations. It is a simulation-type game.

I have a manually scaled instance that I start which uses the deferred API and task queue like so (some error handling etc, removed for clarity):

@app.route('/_ah/start')
def start():
    log.info('Ticker instance started')
    return tick()

@app.route('/tick')
def tick():
    _do_tick()
    deferred.defer(tick, _countdown=3)
return 'Tick!', 200

The problem is that sometimes I end up with this being scheduled twice for some reason (likely a transient error/timeout causing the task to be rescheduled) and I end up with multiple tasks in the task queue, and the game ticking multiple times per 3-second period.

Any ideas how best to deal with this?

As far as I can see you can't ask a queue 'Are there are tasks of X already there?' or 'how many items on the queue at the moment?'.

I understand that this uses as push queue, and one idea might be to switch instead to a pull queue and have the ticker lease items off the queue, grouped by tag, which would get all of them, including duplicates. Would that be better?

In essence what I really want is just a cron-like scheduler to schedule something every 3 seconds, but I know that the scheduler on GAE likely doesn't run to that resolution.

I could just move everything into the startup handler, e.g.:

@app.route('/_ah/start')
def start():
    log.info('Ticker instance started')
while True:
        _do_tick()
        sleep(3)

return 200

But from what I see, the logs won't update as I do this, as it is perceived to be a single request that never completes. This makes it a bit harder to see in the logs what is going on. Currently I see each individual tick as a separate request log entry.

Also if the above gets killed, I'd need to get it to reschedule itself anyway. Which might not be too much of a hassle as I know there are exceptions you can catch when the instance is about to be shut down and I could then fire off a deferred task to start it again.

Or is there a better way to handle this on GAE?

Upvotes: 0

Views: 1989

Answers (2)

Matt Hamilton
Matt Hamilton

Reputation: 815

I can't see a way to detect/eliminate duplicates, but have worked around it now using a different mechanism. Rather than rely on the task queue as a scheduler, I run my own scheduler loop in a manually scaled instance:

TICKINTERVAL = 3

@app.route('/_ah/start')
def scheduler():
    log.info('Ticker instance started')
    while True:
        if game.is_running():
            task = taskqueue.add(
                url='/v1/game/tick',
                queue_name='tickqueue',
                method='PUT',
                target='tickworker',
                )
        else:
            log.info('Tick skipped as game stopped')
        db_session.rollback()
        sleep(TICKINTERVAL)

I have defined my own queue, tickqueue in queue.yaml

queue:
- name: tickqueue
  rate: 5/s
  max_concurrent_requests: 1
  retry_parameters:
    task_retry_limit: 0
    task_age_limit: 1m

The queue doesn't retry tasks and any tasks left on there longer than a minute get cancelled. I set the max concurrency to 1 so that is only attempts to process one item at a time.

If an occasional 'tick' takes longer than 3 seconds then it will back up on the queue, but the queue should clear if it speeds up again. If ticks end up taking longer than 3s on average then the tasks that have been on the queue longer than a minute will get discarded.

This gives the advantage that I get a log entry for each tick (and it is called /v1/game/tick so easy to spot, as opposed to /_ah/deferred). The downside is that I am needing to use one instance for the scheduler and one for the worker, as you can't have the scheduler instance process requests as it won't do until /_ah/start completes, which it never does here.

Upvotes: 1

Dan Cornilescu
Dan Cornilescu

Reputation: 39824

You can set to 0 the task_retry_limit value in the _retry_options optional argument as mentioned in https://stackoverflow.com/a/36621588/4495081.

The trouble is that if a valid reason for a failure exists then the ticking job stops forever. You may want to also keep track of the last time the job executed and have a cron-based sanity-check job to periodically check that ticking is still running and restart it if not.

Upvotes: 0

Related Questions