user1496984
user1496984

Reputation: 11575

gevent - Pass a value to many greenlets in parallel

I'm trying to implement a simple gevent setup. There is a sender that should send a value to several waiters in parallel. The Event class gets the closest to solving this, shown below.

Every three seconds, the setter creates an event, which unblocks all the waiters. The event is cleared right after, so the waiters block again until the next time.

import gevent
from gevent.event import Event

evt = Event()

def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    while True:
        gevent.sleep(3)
        evt.set()
        evt.clear()

def waiter(arg):
    while True:
        evt.wait()
        print("waiter {}".format(arg))

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter,1),
        gevent.spawn(waiter,2),
        gevent.spawn(waiter,3),
    ])

if __name__ == '__main__': main()

Now, I only need to do this, with the addition of passing a value to the waiters. The obvious choice would be to use an AsyncResult. Yet, it is not possible to clear the AsyncResult object, so the waiters end up in an infinite loop.

Do you have any ideas how to implement this?

Upvotes: 4

Views: 1341

Answers (3)

iman
iman

Reputation: 22268

you can add this behavior in a derived class from Event itself.

Here we added a get() method that simply uses wait() and also setval() that uses set()

import gevent
from gevent.event import Event

class Evt(Event):

    def __init__(self):
        super().__init__()
        self._val = None

    def setval(self, val):
        self._val = val
        self.set()

    def get(self):
        self.wait()
        return self._val

evt = Evt()

def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    n = 0
    while True:
        gevent.sleep(3)
        evt.setval(n)
        evt.clear()
        n += 1

def waiter(arg):
    while True:
        val = evt.get()
        print("waiter {}: val = {}".format(arg, val))

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter, 1),
        gevent.spawn(waiter, 2),
        gevent.spawn(waiter, 3),
    ])

if __name__ == '__main__':
    main()

Upvotes: 0

domoarigato
domoarigato

Reputation: 2912

why not just use a mutable object to store whatever you want to send, and pass a reference into the setters and waiters. the setter can just change it to whatever before calling set? see below:

import gevent
from gevent.event import Event

evt = Event()

def setter(arg):
    '''After 3 seconds, wake all threads waiting on the value of arg['it']'''
    while True:
        gevent.sleep(3)
        arg['it'] += 1
        evt.set()
        evt.clear()

def waiter(num, arg):
    while True:
        evt.wait()
        print("waiter {} {}".format(num, arg['it']))

def main():
    THING = {'it': 1}
    gevent.joinall([
        gevent.spawn(setter, THING),
        gevent.spawn(waiter, 1, THING),
        gevent.spawn(waiter, 2, THING),
        gevent.spawn(waiter, 3, THING),
    ])

if __name__ == '__main__': main()

output:

waiter 2 2
waiter 1 2
waiter 3 2
waiter 2 3
waiter 1 3
waiter 3 3
waiter 2 4
waiter 1 4
waiter 3 4

Upvotes: 0

dano
dano

Reputation: 94901

I think your best bet is to use queues for this. I created a BroadcastQueue class which makes it a little easier to manage sending one value to many consumers. The producer calls BroadcastQueue.broadcast(), which will send a value to all the registered consumers. Consumers register by calling BroadcastQueue.register, which returns a unique gevent.queue.Queue() object. The consumers then use that object to get messages from the producer.

import gevent
from gevent.queue import Queue


class BroadcastQueue(object):
    def __init__(self):
        self._queues = []

    def register(self):
        q = Queue()
        self._queues.append(q)
        return q

    def broadcast(self, val):
        for q in self._queues:
            q.put(val)


def setter(bqueue):
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    while True:
        gevent.sleep(3)
        bqueue.broadcast("hi")

def waiter(arg, bqueue):
    queue = bqueue.register()
    while True:
        val = queue.get()
        print("waiter {} {}".format(arg, val))

def main():
    bqueue = BroadcastQueue()
    gevent.joinall([
        gevent.spawn(setter, bqueue),
        gevent.spawn(waiter, 1, bqueue),
        gevent.spawn(waiter, 2, bqueue),
        gevent.spawn(waiter, 3, bqueue),
    ])

if __name__ == '__main__':
    main()

Output:

waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi
waiter 1 hi
waiter 2 hi
waiter 3 hi

Upvotes: 1

Related Questions