Reputation: 13
I have this application that shares work across multiple devices and starts processes on them, they often need to synchronize to each other.
After that the only way each agent will communicate is with MQTT (Publish / subscribe on topics). The MQTT library I'm using is Paho-MQTT.
My current issue is that I can't find a good way to make them wait. I feel like while true : sleep
is not a good way of doing it but I have no clue on how to do it better.
My current code looks like this:
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
So far my only idea would be to find the best wait delay by dichotomy tho it will be time consuming so I'm hoping on a better solution.
Note:
def wait():
while self.wake_up_token == 0:
pass
is a terrible solution because the process will be constantly in competition to check the condition and it makes the system thousands of times slower.
Minimal example I guess: (1 agent, 1 scheduler)
from paho.mqtt.client import Client
from time import sleep
class Agent:
def __init__(self, broker_ip: str, client_id: str):
self.client: Client = Client(client_id=client_id)
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("wake_up_topic")
self.client.message_callback_add("wake_up_topic", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
# do something
self.client.publish(topic="agent/action_done", message="")
self.wait()
# do something
class Scheduler:
def __init__(self, broker_ip: str):
self.client: Client = Client(client_id="scheduler")
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("agent/action_done")
self.client.message_callback_add("agent/action_done", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token += 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
#agents are doing their thing
self.wait()
self.client.publish(topic="wake_up_topic", message="")
I don't know if it is useful, but the goal of the whole project is to build a MAS frameworks that shares the agents across multiple servers.
Upvotes: 0
Views: 106
Reputation: 5093
I think a standard Semaphore is most useful in this case. I can't test this, since I don't have an MQTT client/server running, but it should be something like this:
import threading
class Scheduler:
def __init__(self, broker_ip: str):
# your init code here (unchanged), but add this line:
self.semaphore = threading.Semaphore(0)
def wake_up(self, client, userdata, message) -> None:
self.semaphore.release()
def wait(self) -> None:
self.semaphore.acquire()
Upvotes: 1