Reputation: 14292
Currently, I am using ZeroRPC, I have "workers" connect to the "server" and do the work that the server sends them.
Currently calls are made over ZeroRPC as soon as there is a call to make, as far as I can tell it uses a FIFO queue.
I would like to use my own queue so that I throttle/prioritize the calls.
I'm hoping that ZeroRPC exposes a gevent Event
that triggers when its internal queue runs empty.
Upvotes: 2
Views: 1137
Reputation: 1035
I haven't tried this myself, but I have read through the source. I am motivated because I want to do this myself.
Seems like what you would do is inherit zerorpc.Server
and override the _acceptor
method. According to the source, _acceptor
is what receives messages and then spawns threads to run them. So if you change up the logic/loop to incorporate your queue, you can use that to throttle.
Upvotes: 1
Reputation: 633
What you want to do, is create your own work queue in your server. And dispatch yourself the calls in the priorities you wish.
Since few lines of code express more than any vampire story in 3 volumes, lets see in pseudo code what the server could look like:
myqueue = MySuperBadAssQueue()
def myqueueprocessor():
for request in myqueue: # blocks until next request
gevent.spawn(request.processme) # do the job asynchronously
gevent.spawn(myqueueprocessor) # do that at startup
class Server:
def dosomething(args...blabla...): # what users are calling
request = Request(args...blabla...)
myqueue.put(request) # something to do buddy!
return request.future.get() # return when request is completed
# (can also raise an exception)
# An example of what a request could look like:
class Request:
def __init__(self, ....blablabla...):
self.future = gevent.AsyncResult()
def process():
try:
result = someworker(self.args*) # call some worker
self.future.set(result) # complete the initial request
except Exception as e:
self.future.set_exception(e)
Its up to MySuperBadAssQueue to do all the smart work, throttle if you want, cancel out a request with an exception if necessary, etc...
ZeroRPC does not expose any event to let you know if its 'internal' queue runs empty:
In fact, there is no explicit queue in ZeroRPC. What happens, is simply first come first serve, and the exact order depend both of ZeroMQ and the Gevent IOLoop (libevent or libev depending of the version). It happens that in practice, this conveniently plays like a FIFO queue.
Upvotes: 3