Andriy Drozdyuk
Andriy Drozdyuk

Reputation: 61081

How to send RabbitMQ messages to Pykka actor?

UPDATE Aug, 2015: For people wanting to use messaging, I currently would recommend zeromq. Could be used in addition to, or as a complete replacement of, pykka.

How can I listen to a RabbitMQ queue for messages and then forward them to an actor within Pykka?

Currently, when I try to do so, I get weird behavior and the system halts to a stop.

Here is how I have my actor implemented:

class EventListener(eventlet.EventletActor):
    def __init__(self, target):
        """
        :param pykka.ActorRef target: Where to send the queue messages.
        """
        super(EventListener, self).__init__()

        self.target = target

    def on_start(self):
        ApplicationService.listen_for_events(self.actor_ref)

And here is my method inside the ApplicationService class that is supposed to check the queue for new messages:

@classmethod
def listen_for_events(cls, actor):
    """
    Subscribe to messages and forward them to the given actor.
    """    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    def callback(ch, method, properties, body):
        message = pickle.loads(body)
        actor.tell(message)

    channel.basic_consume(callback, queue='test', no_ack=True)
    channel.start_consuming()            

It seems like start_consuming is blocking indefinitely. Is there a way I can "poll" the queue periodically myself?

Upvotes: 4

Views: 899

Answers (1)

erik-e
erik-e

Reputation: 3891

All your code looks correct to me. If you would like to check the queue used by each actor, you can check their actor_inbox property available on the actor reference returned from Actor#start.

I have run into similar issues when inheriting from EventletActor so to test I tried the same code using an EventletActor and using a ThreadingActor. As far as I can tell from the source code they both are using eventlet to do work. The ThreadingActor works great for me but the EventletActor doesn't work with ActorRef#tell, it does work with ActorRef#ask.

I started with two files in the same directory as shown below.

my_actors.py: Initializes two actors which will respond to messages by printing the message content prefaced by their class name.

from pykka.eventlet import EventletActor
import pykka


class MyThreadingActor(pykka.ThreadingActor):
    def __init__(self):
        super(MyThreadingActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyThreadingActor Received: {message}".format(
                message=message)
        )


class MyEventletActor(EventletActor):
    def __init__(self):
        super(MyEventletActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyEventletActor Received: {message}".format(
                message=message)
        )


my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()

my_queue.py: Sets up a queue in pika, sends a message to the queue which is forwarded to the two actors setup before. After each actor is told about the message, their current actor inbox is checked for anything in the queue.

from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika


def on_message(channel, method_frame, header_frame, body):
    print "Received Message", body
    my_threading_actor_ref.tell({"msg": body})
    my_eventlet_actor_ref.tell({"msg": body})

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox

    channel.basic_ack(delivery_tag=method_frame.delivery_tag)


queue_name = 'test'
connection = pika.BlockingConnection()

channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

    # It is very important to stop these actors, otherwise you may lockup
    my_threading_actor_ref.stop()
    my_eventlet_actor_ref.stop()
connection.close()

When I run my_queue.py the output is as follows:

Received Message A Message

ThreadingActor Inbox <Queue.Queue instance at 0x10bf55878>

MyThreadingActor Received: {'msg': 'A Message'}

EventletActor Inbox <Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

When I hit CTRL+C to stop the queue, I notice that the EventletActor finally receives the message and prints it:

^CMyEventletActor Received: {'msg': 'A Message'}

All this leads me to believe that there may be a bug in EventletActor, I think your code is fine and a bug exists which I was unable to find in the code on first inspection.

I hope this information helps.

Upvotes: 3

Related Questions