Reputation: 145
I faced a problem using pydispatch module for communication between threads. I used an example provided here: https://sites.google.com/site/hardwaremonkey/blog/python-howtocommunicatebetweenthreadsusingpydispatch
I slightly modified it to provide a bit more detailed information in the log. In particular, I made it display the actual thread name as well:
from pydispatch import dispatcher
import threading
import time
import logging
log_formatter = logging.Formatter('%(asctime)s %(levelname)s [%(name)s] [%(threadName)s] %(message)s', '%H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_handler = logging.StreamHandler()
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
ALICE_SIGNAL='alice_signal'
ALICE_SENDER='alice_sender'
BOB_SIGNAL='bob_signal'
BOB_SENDER='bob_sender'
class Alice():
''' alice procrastinates and replies to bob'''
def __init__(self):
logger.debug('Alice instantiated')
dispatcher.connect(self.alice_dispatcher_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
self.alice()
def alice_dispatcher_receive(self, message):
''' handle dispatcher'''
logger.debug('Alice has received message: {}'.format(message))
dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)
def alice(self):
''' loop and wait '''
while True:
logger.debug('Alice is procrastinating')
time.sleep(1)
class Bob():
''' bob contacts alice periodically '''
def __init__(self):
logger.debug('Bob instantiated')
dispatcher.connect(self.bob_dispatcher_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
self.bob()
def bob_dispatcher_receive(self, message):
''' handle dispatcher '''
logger.debug('Bob has received message: {}'.format(message))
def bob(self):
''' loop and send messages using a dispatcher '''
while True:
dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
time.sleep(3)
if __name__ == '__main__':
logger.debug('Starting...')
alice_thread = threading.Thread(target=Alice, name='Thread-Alice')
alice_thread.start()
bob_thread = threading.Thread(target=Bob, name='Thread-Bob')
bob_thread.start()
Here is what I've found:
08:10:43 DEBUG [root] [MainThread] Starting...
08:10:43 DEBUG [root] [Thread-Alice] Alice instantiated
08:10:43 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:43 DEBUG [root] [Thread-Bob] Bob instantiated
08:10:43 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:43 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:44 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:45 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:46 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:46 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:46 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:47 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:48 DEBUG [root] [Thread-Alice] Alice is procrastinating
08:10:49 DEBUG [root] [Thread-Bob] Alice has received message: message from Bob
08:10:49 DEBUG [root] [Thread-Bob] Bob has received message: thank you from Alice
08:10:49 DEBUG [root] [Thread-Alice] Alice is procrastinating
See this:
[Thread-Bob] Alice has received message: message from Bob
"Alice has received message" has been executed in Bob's thread. While I expected it to be executed in Alice's thread. From what I understand, dispatcher receives a signal from Bob and goes straight to calling the handlers, in the same thread. So it actually calls Alice's code from Bob's thread, resulting in unexpected nuances.
Problem #1. Bob's execution actually gets blocked while the signal is processed by Alice.
Problem #2. In a larger application Alice may have its code unexpectedly executed in multiple parallel threads.
Problem #3. Poor encapsulation in general. We expect Alice and Bob to be run in their own threads in one single instance each, independently from each other, only exchanging messages. Which is not the case here, as they actually call each other's code.
Is there a way to solve this for pydispatcher? Or can you recommend another library for communication between threads that is free of these problems?
Upvotes: 3
Views: 1508
Reputation: 145
Found a solution using event_loop.call_soon_threadsafe().
Here is the code now:
def register_signal_handler(loop, handler, signal, sender):
def dispatcher_receive(message):
loop.call_soon_threadsafe(handler, message)
dispatcher.connect(dispatcher_receive, signal=signal, sender=sender, weak=False)
class Alice():
def __init__(self):
logger.debug('Alice instantiated')
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
register_signal_handler(self.loop, self.alice_receive, signal=BOB_SIGNAL, sender=BOB_SENDER)
self.alice()
def alice_receive(self, message):
logger.debug('Alice has received message: {}'.format(message))
dispatcher.send(message='thank you from Alice', signal=ALICE_SIGNAL, sender=ALICE_SENDER)
def alice(self):
def procrastinate():
logger.debug('Alice is procrastinating')
self.loop.call_later(1, procrastinate)
procrastinate()
self.loop.run_forever()
class Bob():
def __init__(self):
logger.debug('Bob instantiated')
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
register_signal_handler(self.loop, self.bob_receive, signal=ALICE_SIGNAL, sender=ALICE_SENDER)
self.bob()
def bob_receive(self, message):
logger.debug('Bob has received message: {}'.format(message))
def bob(self):
def poke_alice():
dispatcher.send(message='message from Bob', signal=BOB_SIGNAL, sender=BOB_SENDER)
self.loop.call_later(3, poke_alice)
self.loop.call_later(3, poke_alice)
self.loop.run_forever()
So when a message arrives from Bob to Alice, the signal handler does not actually perform the job of handling the message, but only schedules execution of the actual handler that will perform the job. And the actual handler will get called in Alice's thread.
In this case:
So my goals here are achieved.
Do you guys think this is a good solution? Would love to hear a comment on this.
Upvotes: 1