Reputation: 61081
UPDATE Aug, 2015: For people wanting to use messaging, I currently would recommend zeromq. Could be used in addition to, or as a complete replacement of, pykka.
How can I listen to a RabbitMQ queue for messages and then forward them to an actor within Pykka?
Currently, when I try to do so, I get weird behavior and the system halts to a stop.
Here is how I have my actor implemented:
class EventListener(eventlet.EventletActor):
def __init__(self, target):
"""
:param pykka.ActorRef target: Where to send the queue messages.
"""
super(EventListener, self).__init__()
self.target = target
def on_start(self):
ApplicationService.listen_for_events(self.actor_ref)
And here is my method inside the ApplicationService
class that is supposed to check the queue for new messages:
@classmethod
def listen_for_events(cls, actor):
"""
Subscribe to messages and forward them to the given actor.
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
message = pickle.loads(body)
actor.tell(message)
channel.basic_consume(callback, queue='test', no_ack=True)
channel.start_consuming()
It seems like start_consuming
is blocking indefinitely. Is there a way I can "poll" the queue periodically myself?
Upvotes: 4
Views: 899
Reputation: 3891
All your code looks correct to me. If you would like to check the queue used by each actor, you can check their actor_inbox
property available on the actor reference returned from Actor#start
.
I have run into similar issues when inheriting from EventletActor
so to test I tried the same code using an EventletActor
and using a ThreadingActor
. As far as I can tell from the source code they both are using eventlet
to do work. The ThreadingActor
works great for me but the EventletActor
doesn't work with ActorRef#tell
, it does work with ActorRef#ask
.
I started with two files in the same directory as shown below.
my_actors.py
: Initializes two actors which will respond to messages by printing the message content prefaced by their class name.
from pykka.eventlet import EventletActor
import pykka
class MyThreadingActor(pykka.ThreadingActor):
def __init__(self):
super(MyThreadingActor, self).__init__()
def on_receive(self, message):
print(
"MyThreadingActor Received: {message}".format(
message=message)
)
class MyEventletActor(EventletActor):
def __init__(self):
super(MyEventletActor, self).__init__()
def on_receive(self, message):
print(
"MyEventletActor Received: {message}".format(
message=message)
)
my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()
my_queue.py
: Sets up a queue in pika, sends a message to the queue which is forwarded to the two actors setup before. After each actor is told about the message, their current actor inbox is checked for anything in the queue.
from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika
def on_message(channel, method_frame, header_frame, body):
print "Received Message", body
my_threading_actor_ref.tell({"msg": body})
my_eventlet_actor_ref.tell({"msg": body})
print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
queue_name = 'test'
connection = pika.BlockingConnection()
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# It is very important to stop these actors, otherwise you may lockup
my_threading_actor_ref.stop()
my_eventlet_actor_ref.stop()
connection.close()
When I run my_queue.py
the output is as follows:
Received Message A Message
ThreadingActor Inbox
<Queue.Queue instance at 0x10bf55878>
MyThreadingActor Received:
{'msg': 'A Message'}
EventletActor Inbox
<Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>
When I hit CTRL+C
to stop the queue, I notice that the EventletActor
finally receives the message and prints it:
^C
MyEventletActor Received:{'msg': 'A Message'}
All this leads me to believe that there may be a bug in EventletActor
, I think your code is fine and a bug exists which I was unable to find in the code on first inspection.
I hope this information helps.
Upvotes: 3