Reputation: 13118
I'm trying to implement the following logic on Google Appengine:
rpc = call_external_service(timeout=T)
rpc.wait(timeout=T/2)
if rpc.done:
return rpc.result
rpc2 = call_backup_service(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2], timeout=T/2)
return finished_rpc.result
That is, call a service (using urlfetch
) with a timeout of T. If it doesn't finish in T/2
, try calling a backup service and then wait for either of them to finish.
The problem is that the RPC mechanism doesn't seem to offer a "wait with timeout" primitive. That is, if I create an RPC with a deadline T, I can't say "wait for T/2 seconds and see if the RPC finished".
Has anyone a workaround for this?
Edit: @TarunLalwani posted a potential solution. The idea is to have a special handler which sleeps for a predetermined amount of time (something like /sleep?delay=5
) and add that as second parameter to UserRPC.wait_any
. Ie. something like:
rpc = call_external_service(timeout=T)
rpc2 = create_wait_rpc(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2])
if finished_rpc == finished_rpc:
return rpc.result
rpc2 = call_backup_service(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2])
return finished_rpc.result
Unfortunately it seems that UserRPC.wait_any
is implemented something like the following:
def wait_any(rpcs):
last_rpc = rpcs[-1]
last_rpc.wait()
return last_rpc
That is, it always waits for the last RPC to finish, which is a problem in our case because if the initial call finishes in less than T/2 time, we would like to return the result immediately, rather than having to wait a minimum of T/2. I tested this with both the local dev_appserver
and in production (the quick test code can be grabbed from https://github.com/cdman/gae-rpc-test).
This could still be made to work by using some extremely small timeout for rpc2
, something like:
rpc = call_external_service(timeout=T)
end_time = time.time() + T/2
while time.time() < end_time:
wait_any([rpc, create_wait_rpc(timeout=0.1)])
if rpc.status == 2:
return rpc.result
# else, call backup service
However, here I'm still artificially limiting my time resolution to 100ms (so if the initial call finishes in 230ms, we return the result only after 300ms) and I'll be spamming my logs with a lot of requests to /sleep
. Also, this could increase the costs of running the project.
Alternatively, if there were some kind of no-op / low overhead RPC that could be passed in as the second parameter of UserRPC.wait_any
to keep the event-loop moving, then this semi-busy-wait solution could perhaps possibly work :-)
Edit 2: I implemented the busy-waiting version using the asynchronous version of memcache.get
from ndb. You can take a look at the source here: https://github.com/cdman/gae-rpc-test/blob/ndb-async/main.py
Theoretically this should be free (see https://cloud.google.com/appengine/pricing#other-resources), but still feels like a hack.
Edit 3: It looks like the following should work:
from google.appengine.ext.ndb import eventloop
# ...
ev = eventloop.get_event_loop()
while time.time() < end_time:
ev.run1()
if rpc.done():
break
time.sleep(0.001)
(that is run the eventloop explicitly, check the RPC and it's not done, sleep a little and retry)
Unfortunately the "run the eventloop" step just blocks until the urlfetch completes at a certain point :(
Upvotes: 3
Views: 282
Reputation: 146630
TL;DR;
After digging through the source of python appengine sdk, below are my observations and take away.
wait_any is not what it looks like
When you use wait_any
on 2 RPCs
you would like the one that finished first but the logic of the same doesn't seem that way
assert iter(rpcs) is not rpcs, 'rpcs must be a collection, not an iterator'
finished, running = cls.__check_one(rpcs)
if finished is not None:
return finished
if running is None:
return None
try:
cls.__local.may_interrupt_wait = True
try:
running.__rpc.Wait()
except apiproxy_errors.InterruptedError, err:
err.rpc._exception = None
err.rpc._traceback = None
finally:
cls.__local.may_interrupt_wait = False
finished, runnning = cls.__check_one(rpcs)
return finished
In the below line of code
finished, running = cls.__check_one(rpcs)
The code of method __check_one
is like below
rpc = None
for rpc in rpcs:
assert isinstance(rpc, cls), repr(rpc)
state = rpc.__rpc.state
if state == apiproxy_rpc.RPC.FINISHING:
rpc.__call_user_callback()
return rpc, None
assert state != apiproxy_rpc.RPC.IDLE, repr(rpc)
return None, rpc
So it just checks if any one of them is already finished and if not then it returns the last one the in collection, the last return None, rpc
Then wait_any
makes a call to running.__rpc.Wait()
. So created a simple sleep
/delay
handler for the same
class SleepHandler(webapp2.RequestHandler):
def get(self):
delay = float(self.request.get('delay')) if self.request.get('delay') else 10
sleep(delay)
self.response.status_int = 200
self.response.write('Response delayed by {}'.format(delay))
And added below MainHandler to test the deadline
class MainHandler(webapp2.RequestHandler):
def get(self):
# rpc = UserRPC('dummywait', 5, stubmap=MyStubMap)
rpc = urlfetch.create_rpc(deadline=2.0)
rpc2 = urlfetch.create_rpc(deadline=6.0)
urlfetch.make_fetch_call(rpc, self.request.host_url + "/sleep?delay=1")
urlfetch.make_fetch_call(rpc2, self.request.host_url + "/sleep?delay=5")
try:
print(datetime.now())
finished = apiproxy_stub_map.UserRPC.wait_any([rpc, rpc2])
print(finished.request.url_)
print(datetime.now())
i = 0
except Exception as ex:
print_exception(ex)
# ... do other things ...
try:
print(datetime.now())
result = finished.get_result()
print(datetime.now())
if result.status_code == 200:
text = result.content
self.response.write(text)
else:
self.response.status_int = result.status_code
self.response.write('URL returned status code {}'.format(
result.status_code))
except urlfetch.DownloadError:
print(datetime.now())
self.response.status_int = 500
self.response.write('Error fetching URL')
app = webapp2.WSGIApplication([
('/', MainHandler),
('/sleep', SleepHandler),
], debug=True)
So below are the points in above code
rpc
has a deadline of 2.0
sec and the actual request finishes in 1.0
rpc2
has a deadline of 6.0
sec and the actual request finishes in 5.0
Now ideally the expectation would be that we get rpc
as finished task and the data displayed on url response. But the output is
Now if we switch the order of wait_any
arguments from
finished = apiproxy_stub_map.UserRPC.wait_any([rpc, rpc2])
to
finished = apiproxy_stub_map.UserRPC.wait_any([rpc2, rpc])
The output changes to below
So this means if you create a deadline of T
and T/2
then your minimal waiting time would always be T/2
if you use that as the last parameter.
So whatever solutions you will try to solve this in google appengine they will still be dirty tricks. Now possible trick is to do intervals. Below one such sample
T = 10.0
# Deadline T
rpc_main = urlfetch.create_rpc(deadline=T)
# Deadline T/2
rpc_backup = urlfetch.create_rpc(deadline=T / 2)
urlfetch.make_fetch_call(rpc_main, self.request.host_url + "/sleep?delay=7")
i = 0.0
while i < T / 2:
rpc_compare = urlfetch.create_rpc()
urlfetch.make_fetch_call(rpc_compare, self.request.host_url + "/sleep?delay=0.5")
finished = apiproxy_stub_map.UserRPC.wait_any([rpc_main, rpc_compare])
i += 0.5
if finished == rpc_main:
break
if finished != rpc_main:
# we need to fire a backup request
urlfetch.make_fetch_call(rpc_backup, self.request.host_url + "/sleep?delay=1")
finished = apiproxy_stub_map.UserRPC.wait_any([rpc_backup, rpc_main])
try:
finished.get_result()
except DeadlineExceededError as ex:
# Rpc main finished with error then we need to switch to Backup request
finished = rpc_backup
Here we give rpc_main
as priority instead of backup, even though backup would finish first in this case, we get response from rpc_main
Now if I change the RPC main to fall below deadline
urlfetch.make_fetch_call(rpc_main, self.request.host_url + "/sleep?delay=20")
The output will change to
So it shows both polling as well worst case wait scenario. This is the only possible workaround/implementation I have been able to workout looking at the original source code.
Upvotes: 4
Reputation: 46
You can set a custom request timeout by using the following urlfetch function:
urlfetch.set_default_fetch_deadline(value)
Note that this function stores the new default deadline on a thread-local variable, so it must be set for each request, for example, in a custom middleware. Value parameter is the deadline in seconds for the operation; the default is a system-specific deadline (typically 5 seconds).
The actual implementation will depend on your language but once you set a custom timeout you can easily set a value of deadline/2 for your call to the backup service.
Upvotes: -1