Reputation: 11575
I'm trying to implement a simple gevent setup. There is a sender that should send a value to several waiters in parallel. The Event
class gets the closest to solving this, shown below.
Every three seconds, the setter creates an event, which unblocks all the waiters. The event is cleared right after, so the waiters block again until the next time.
import gevent
from gevent.event import Event
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
while True:
gevent.sleep(3)
evt.set()
evt.clear()
def waiter(arg):
while True:
evt.wait()
print("waiter {}".format(arg))
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter,1),
gevent.spawn(waiter,2),
gevent.spawn(waiter,3),
])
if __name__ == '__main__': main()
Now, I only need to do this, with the addition of passing a value to the waiters. The obvious choice would be to use an AsyncResult
. Yet, it is not possible to clear
the AsyncResult object, so the waiters end up in an infinite loop.
Do you have any ideas how to implement this?
Upvotes: 4
Views: 1341
Reputation: 22268
you can add this behavior in a derived class from Event itself.
Here we added a get()
method that simply uses wait()
and also setval()
that uses set()
import gevent
from gevent.event import Event
class Evt(Event):
def __init__(self):
super().__init__()
self._val = None
def setval(self, val):
self._val = val
self.set()
def get(self):
self.wait()
return self._val
evt = Evt()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
n = 0
while True:
gevent.sleep(3)
evt.setval(n)
evt.clear()
n += 1
def waiter(arg):
while True:
val = evt.get()
print("waiter {}: val = {}".format(arg, val))
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter, 1),
gevent.spawn(waiter, 2),
gevent.spawn(waiter, 3),
])
if __name__ == '__main__':
main()
Upvotes: 0
Reputation: 2912
why not just use a mutable object to store whatever you want to send, and pass a reference into the setters and waiters. the setter can just change it to whatever before calling set? see below:
import gevent
from gevent.event import Event
evt = Event()
def setter(arg):
'''After 3 seconds, wake all threads waiting on the value of arg['it']'''
while True:
gevent.sleep(3)
arg['it'] += 1
evt.set()
evt.clear()
def waiter(num, arg):
while True:
evt.wait()
print("waiter {} {}".format(num, arg['it']))
def main():
THING = {'it': 1}
gevent.joinall([
gevent.spawn(setter, THING),
gevent.spawn(waiter, 1, THING),
gevent.spawn(waiter, 2, THING),
gevent.spawn(waiter, 3, THING),
])
if __name__ == '__main__': main()
output:
waiter 2 2
waiter 1 2
waiter 3 2
waiter 2 3
waiter 1 3
waiter 3 3
waiter 2 4
waiter 1 4
waiter 3 4
Upvotes: 0
Reputation: 94901
I think your best bet is to use queues for this. I created a BroadcastQueue
class which makes it a little easier to manage sending one value to many consumers. The producer calls BroadcastQueue.broadcast()
, which will send a value to all the registered consumers. Consumers register by calling BroadcastQueue.register
, which returns a unique gevent.queue.Queue()
object. The consumers then use that object to get
messages from the producer.
import gevent
from gevent.queue import Queue
class BroadcastQueue(object):
def __init__(self):
self._queues = []
def register(self):
q = Queue()
self._queues.append(q)
return q
def broadcast(self, val):
for q in self._queues:
q.put(val)
def setter(bqueue):
'''After 3 seconds, wake all threads waiting on the value of evt'''
while True:
gevent.sleep(3)
bqueue.broadcast("hi")
def waiter(arg, bqueue):
queue = bqueue.register()
while True:
val = queue.get()
print("waiter {} {}".format(arg, val))
def main():
bqueue = BroadcastQueue()
gevent.joinall([
gevent.spawn(setter, bqueue),
gevent.spawn(waiter, 1, bqueue),
gevent.spawn(waiter, 2, bqueue),
gevent.spawn(waiter, 3, bqueue),
])
if __name__ == '__main__':
main()
Output:
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
Upvotes: 1