Grey Panther
Grey Panther

Reputation: 13118

Do RPCs on Google Appengine support "wait with timeout"?

I'm trying to implement the following logic on Google Appengine:

rpc = call_external_service(timeout=T)
rpc.wait(timeout=T/2)
if rpc.done:
   return rpc.result
rpc2 = call_backup_service(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2], timeout=T/2)
return finished_rpc.result

That is, call a service (using urlfetch) with a timeout of T. If it doesn't finish in T/2, try calling a backup service and then wait for either of them to finish.

The problem is that the RPC mechanism doesn't seem to offer a "wait with timeout" primitive. That is, if I create an RPC with a deadline T, I can't say "wait for T/2 seconds and see if the RPC finished".

Has anyone a workaround for this?


Edit: @TarunLalwani posted a potential solution. The idea is to have a special handler which sleeps for a predetermined amount of time (something like /sleep?delay=5) and add that as second parameter to UserRPC.wait_any. Ie. something like:

rpc = call_external_service(timeout=T)
rpc2 = create_wait_rpc(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2])
if finished_rpc == finished_rpc:
  return rpc.result
rpc2 = call_backup_service(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2])
return finished_rpc.result

Unfortunately it seems that UserRPC.wait_any is implemented something like the following:

def wait_any(rpcs):
    last_rpc = rpcs[-1]
    last_rpc.wait()
    return last_rpc

That is, it always waits for the last RPC to finish, which is a problem in our case because if the initial call finishes in less than T/2 time, we would like to return the result immediately, rather than having to wait a minimum of T/2. I tested this with both the local dev_appserver and in production (the quick test code can be grabbed from https://github.com/cdman/gae-rpc-test).

This could still be made to work by using some extremely small timeout for rpc2, something like:

rpc = call_external_service(timeout=T)
end_time = time.time() + T/2
while time.time() < end_time:
    wait_any([rpc, create_wait_rpc(timeout=0.1)])
    if rpc.status == 2:
        return rpc.result
# else, call backup service

However, here I'm still artificially limiting my time resolution to 100ms (so if the initial call finishes in 230ms, we return the result only after 300ms) and I'll be spamming my logs with a lot of requests to /sleep. Also, this could increase the costs of running the project.

Alternatively, if there were some kind of no-op / low overhead RPC that could be passed in as the second parameter of UserRPC.wait_any to keep the event-loop moving, then this semi-busy-wait solution could perhaps possibly work :-)


Edit 2: I implemented the busy-waiting version using the asynchronous version of memcache.get from ndb. You can take a look at the source here: https://github.com/cdman/gae-rpc-test/blob/ndb-async/main.py

Theoretically this should be free (see https://cloud.google.com/appengine/pricing#other-resources), but still feels like a hack.


Edit 3: It looks like the following should work:

from google.appengine.ext.ndb import eventloop
# ...
ev = eventloop.get_event_loop()
while time.time() < end_time:
  ev.run1()
  if rpc.done():
    break
  time.sleep(0.001)

(that is run the eventloop explicitly, check the RPC and it's not done, sleep a little and retry)

Unfortunately the "run the eventloop" step just blocks until the urlfetch completes at a certain point :(

Upvotes: 3

Views: 282

Answers (2)

Tarun Lalwani
Tarun Lalwani

Reputation: 146630

TL;DR;

After digging through the source of python appengine sdk, below are my observations and take away.

wait_any is not what it looks like

When you use wait_any on 2 RPCs you would like the one that finished first but the logic of the same doesn't seem that way

assert iter(rpcs) is not rpcs, 'rpcs must be a collection, not an iterator'
finished, running = cls.__check_one(rpcs)
if finished is not None:
  return finished
if running is None:
  return None
try:
  cls.__local.may_interrupt_wait = True
  try:
    running.__rpc.Wait()
  except apiproxy_errors.InterruptedError, err:
    err.rpc._exception = None
    err.rpc._traceback = None
finally:
  cls.__local.may_interrupt_wait = False
finished, runnning = cls.__check_one(rpcs)
return finished

In the below line of code

finished, running = cls.__check_one(rpcs)

The code of method __check_one is like below

rpc = None
for rpc in rpcs:
  assert isinstance(rpc, cls), repr(rpc)
  state = rpc.__rpc.state
  if state == apiproxy_rpc.RPC.FINISHING:
    rpc.__call_user_callback()
    return rpc, None
  assert state != apiproxy_rpc.RPC.IDLE, repr(rpc)
return None, rpc

So it just checks if any one of them is already finished and if not then it returns the last one the in collection, the last return None, rpc

Then wait_any makes a call to running.__rpc.Wait(). So created a simple sleep/delay handler for the same

class SleepHandler(webapp2.RequestHandler):

def get(self):
    delay = float(self.request.get('delay')) if self.request.get('delay') else 10
    sleep(delay)
    self.response.status_int = 200
    self.response.write('Response delayed by {}'.format(delay))

And added below MainHandler to test the deadline

class MainHandler(webapp2.RequestHandler):
    def get(self):
        # rpc = UserRPC('dummywait', 5, stubmap=MyStubMap)

        rpc = urlfetch.create_rpc(deadline=2.0)
        rpc2 = urlfetch.create_rpc(deadline=6.0)
        urlfetch.make_fetch_call(rpc, self.request.host_url + "/sleep?delay=1")
        urlfetch.make_fetch_call(rpc2, self.request.host_url + "/sleep?delay=5")
        try:
            print(datetime.now())
            finished = apiproxy_stub_map.UserRPC.wait_any([rpc, rpc2])
            print(finished.request.url_)
            print(datetime.now())
            i = 0
        except Exception as ex:
            print_exception(ex)
        # ... do other things ...
        try:
            print(datetime.now())
            result = finished.get_result()
            print(datetime.now())
            if result.status_code == 200:
                text = result.content
                self.response.write(text)
            else:
                self.response.status_int = result.status_code
                self.response.write('URL returned status code {}'.format(
                    result.status_code))
        except urlfetch.DownloadError:
            print(datetime.now())
            self.response.status_int = 500
            self.response.write('Error fetching URL')


app = webapp2.WSGIApplication([
    ('/', MainHandler),
    ('/sleep', SleepHandler),
], debug=True)

So below are the points in above code

  • rpc has a deadline of 2.0 sec and the actual request finishes in 1.0
  • rpc2 has a deadline of 6.0 sec and the actual request finishes in 5.0

Now ideally the expectation would be that we get rpc as finished task and the data displayed on url response. But the output is

RPC response

Now if we switch the order of wait_any arguments from

finished = apiproxy_stub_map.UserRPC.wait_any([rpc, rpc2])

to

finished = apiproxy_stub_map.UserRPC.wait_any([rpc2, rpc])

The output changes to below

RPC wins

So this means if you create a deadline of T and T/2 then your minimal waiting time would always be T/2 if you use that as the last parameter.

So whatever solutions you will try to solve this in google appengine they will still be dirty tricks. Now possible trick is to do intervals. Below one such sample

    T = 10.0
    # Deadline T
    rpc_main = urlfetch.create_rpc(deadline=T)

    # Deadline T/2
    rpc_backup = urlfetch.create_rpc(deadline=T / 2)

    urlfetch.make_fetch_call(rpc_main, self.request.host_url + "/sleep?delay=7")

    i = 0.0

    while i < T / 2:
        rpc_compare = urlfetch.create_rpc()
        urlfetch.make_fetch_call(rpc_compare, self.request.host_url + "/sleep?delay=0.5")

        finished = apiproxy_stub_map.UserRPC.wait_any([rpc_main, rpc_compare])
        i += 0.5
        if finished == rpc_main:
            break

    if finished != rpc_main:
        # we need to fire a backup request
        urlfetch.make_fetch_call(rpc_backup, self.request.host_url + "/sleep?delay=1")

        finished = apiproxy_stub_map.UserRPC.wait_any([rpc_backup, rpc_main])

    try:
        finished.get_result()
    except DeadlineExceededError as ex:
        # Rpc main finished with error then we need to switch to Backup request 
        finished = rpc_backup

Here we give rpc_main as priority instead of backup, even though backup would finish first in this case, we get response from rpc_main

RPC Main

Now if I change the RPC main to fall below deadline

    urlfetch.make_fetch_call(rpc_main, self.request.host_url + "/sleep?delay=20")

The output will change to

RPC Backup wins

So it shows both polling as well worst case wait scenario. This is the only possible workaround/implementation I have been able to workout looking at the original source code.

Upvotes: 4

Trib
Trib

Reputation: 46

You can set a custom request timeout by using the following urlfetch function:

urlfetch.set_default_fetch_deadline(value)

Note that this function stores the new default deadline on a thread-local variable, so it must be set for each request, for example, in a custom middleware. Value parameter is the deadline in seconds for the operation; the default is a system-specific deadline (typically 5 seconds).

The actual implementation will depend on your language but once you set a custom timeout you can easily set a value of deadline/2 for your call to the backup service.

Upvotes: -1

Related Questions