Reputation: 249
I was just wondering, how would you create a loop such that each iteration occurs one after the other? I am aware that multi threading is a thing, and I am familiar with it. One thing I cannot figure out is how to run one loop after another.
For example, say I had 2 functions:
def loop_a():
while True:
time.sleep(1)
print("a")
def loop_b():
while True:
print("b")
How do i get the output to be ababababababababa
, even with the time.sleep(1)
present in the first function?
I am using mpi4py, and was wondering if there was any way to do this using this library. My actual program requires messages to be sent between functions. Otherwise, using any other python libraries such as multiprocessing
should be fine.
Is there a way to do this using threading?
Upvotes: 4
Views: 3715
Reputation: 138
import asyncio
q = asyncio.Queue()
async def loop_a(q):
for i in range(10):
value = await q.get()
print(value)
async def loop_b(q):
for i in range(10):
await q.put("a")
print("b")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(loop_a(q), loop_b(q)))
Upvotes: 2
Reputation: 28332
In pseudocode:
main()
1. set lock for loop1
2. start loop1 on background thread
3. start loop2 on background thread
4. wait
loop1()
1. do the following forever:
2. acquire lock for loop1
3. print 'a'
4. release lock for loop2
loop2()
1. do the following forever:
2. acquire lock for loop2
3. print 'b'
4. release lock for loop1
You can implement the lock as a shared memory variable or as a loop waiting to get a message from your peer or whatever. Acquiring the lock would mean blocking or spin-locking (polling) until the lock is ready; releasing the lock would be setting the shared variable appropriately or sending the right message to the right peers.
Edit: based on a comment, here's a more full development of loop1() and loop2() using one of many available implementation strategies:
(shared lock in global scope)
main()
1. lock = 1
2. start loop1 on background thread
3. start loop2 on background thread
4. wait
loop1()
1. do the following forever
2. loop until lock = 1
3. print 'a'
4. lock = 2
loop2()
1. do the following forever
2. loop until lock = 2
3. print 'b'
4. lock = 1
This implementation uses spin locks and relies on a thread-safe shared variable lock
to coordinate work. Spin locks may or may not be suitable for your application. You can use these in conjunction with some blocking mechanism to cut down on processing at the cost of some delay in processing.
The key is that the lock
is stateful and (should) only be acquired by the right thread. You can do the same thing with message passing if each thread is made "aware" of the "next" thread and messages it when done... and then all threads wait to get messaged.
main()
1. start loop1 on background thread
2. start loop2 on background thread
3. message loop1
4. wait
loop1()
1. do the following forever
2. loop until message received
3. print 'a'
4. message loop2
loop2()
1. do the following forever
2. loop until message received
3. print 'b'
4. message loop1
Upvotes: 0
Reputation: 2320
Here is a solution to the first part of your question - how to run processes in parallel so that each process waits for the previous one has been finished to start working on tasks. I didn't address a message passing aspect here as it seems a bit vague to me and could be implemented differently based on the problem statement. In this example, we create and run three workers which emulate execution by simple time delays. The code snippets should be saved into a single file which can be run from the command line.
We start by importing the required modules:
#!/usr/bin/env python3
import time
from multiprocessing import Process, Event
and implementing a WorkerQueue
class. This class keeps workers in the right order and is responsible for starting and terminating them. The communication between workers is implemented using events. Each worker has other_ready and ready Event
fields which indicate the accomplish statuses of a previous worker and the current worker, accordingly. Note, if there is only one worker in the queue, its other_ready and ready are the same.
class WorkerQueue(object):
def __init__(self):
self._workers = []
def add_worker(self, worker):
if self._workers:
worker.other_ready = self._workers[-1].ready
self._workers[0].other_ready = worker.ready
else:
worker.other_ready = worker.ready
self._workers.append(worker)
def start_workers(self):
if not self._workers:
return
self._workers[0].other_ready.set()
for w in self._workers:
w.start()
def stop_workers(self):
for w in self._workers:
w.join()
Then, we implement the worker itself by inheriting from the Process
class. Note, it's also possible to use threading
instead of multiprocessing
. In this case, the only thing gets changed is the Worker
parent class, Thread
instead of Process
.
class Worker(Process):
def __init__(self, delay, name=None):
super().__init__(name=name)
self.delay = delay
self.other_ready = Event()
self.other_ready.set()
self.ready = Event()
self.stop = Event()
def run(self):
while not self.stop.is_set():
try:
self.other_ready.wait()
t = time.strftime('%H:%M:%S')
print('Started:', self.name, t, flush=True)
time.sleep(self.delay)
t = time.strftime('%H:%M:%S')
print('Finished:', self.name, t, flush=True)
except:
break
self.other_ready.clear()
self.ready.set()
def join(self, timeout=None):
self.stop.set()
super().join(timeout)
Here you see, each worker waits for the previous one to be ready before starting to execute commands. By default, the other_ready is set so that we won't hit a deadlock in the case of a single worker in the queue.
Finally, we implement a main
function where we define workers, adding them to the worker queue, and starting them.
def main():
first = Worker(delay=1, name='first')
second = Worker(delay=3, name='second')
third = Worker(delay=2, name='third')
queue = WorkerQueue()
for w in (first, second, third):
queue.add_worker(w)
queue.start_workers()
try:
# The main infinite loop, do something useful:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
queue.stop_workers()
Don't forget to add the following lines to the end of the file:
if __name__ == '__main__':
main()
Now, it's ready to be saved to a file, say proc_queue.py
, which you can run from the command line to see the results:
$ python3 proc_queue.py
Started: first 16:04:09
Finished: first 16:04:10
Started: second 16:04:10
Finished: second 16:04:13
Started: third 16:04:13
Finished: third 16:04:15
Started: first 16:04:15
Finished: first 16:04:16
Started: second 16:04:16
Finished: second 16:04:19
Started: third 16:04:19
Finished: third 16:04:21
^C
It might be a bit overcomplicated but it's the only solution I could come up with. If you know a better approach I'd be happy to learn about it :)
Upvotes: 0
Reputation: 42786
You can use corutines:
import asyncio
q = asyncio.Queue()
async def loop_a(q):
for i in range(10):
value = await q.get()
print(value)
async def loop_b(q):
for i in range(10):
await q.put("a")
print("b")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(loop_a(q), loop_b(q)))
Here you have the live example
The only think is that the order of execution is not guaranteed unless you sync them somehow.
Upvotes: 1