Reputation: 1124
I'm trying to light a 5mm LED while a function is running. When this function (more details about this below) is finished and has returned a value I would like to break the while loop.
Current code for while loop:
pins = [3,5,8,15,16]
def piBoard():
finished = 0
while finished!=10:
for pin in pins
GPIO.output(
pin, GPIO.HIGH
)
time.sleep(0.1)
GPIO.output(
pin, GPIO.LOW
)
finished+=1
Now in the above example I just run the while loop until the count is equal to 10, not best practice. I would like the while loop to break if my next function has returned a value.
Function I want to break my while loop when returned its value
def myFunction():
Thread(target = piBoard().start()
// Trying to recognize the song
return the song which is recognized
Thanks, - K.
Upvotes: 0
Views: 3228
Reputation: 66
Using decorator and asyncio, inspired by @Eric Ed Lohmar:
import asyncio
def Blink():
from functools import wraps
async def _blink():
while True:
print("OFF")
await asyncio.sleep(.5)
print("ON")
await asyncio.sleep(.5)
def Blink_decorator(func):
@wraps(func)
async def wrapper(*args,**kwargs):
asyncio.ensure_future(_blink())
await func(*args,**kwargs)
return wrapper
return Blink_decorator
@Blink()
async def longTask():
print("Mission Start")
await asyncio.sleep(3)
print("Mission End")
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(longTask())
Upvotes: 0
Reputation: 1162
if you are open to using threads. you can achieve this by using threads. here's the example code
from concurrent.futures._base import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
WORK_FINISHED = False
def piBoard():
while not WORK_FINISHED:
# Do some stuff
# Drink some coffee
def myFunction():
time.sleep(5)
global WORK_FINISHED
WORK_FINISHED = True #update gobal status flag
return something
if __name__ == '__main__':
futures = []
MAX_WORKERS = 5 #max number of threads you want to create
with ThreadPoolExecutor(MAX_WORKERS) as executor:
executor.submit(piBoard)
# submit your function to worker thread
futures.append(executor.submit(myFunction))
# if you need to get return value from `myFunction`
for fut in as_completed(futures):
res = fut.result()
Hope this helps.
Upvotes: 0
Reputation: 4992
You need some kind of inter-thread communication. threading.Event
is about as simple as you can get.
import threading
song_recognized_event = threading.event()
in your song recognizer, call set()
once the song is recognized.
In your LED loop, check isSet()
occasionally while toggling LEDs.
while not song_recognized_event.isSet():
# toggle LEDs
Run clear()
to reset it.
Upvotes: 0
Reputation: 1922
It sounds to me like you want to write a class that extends Thread
and implements __enter__
and __exit__
methods to make it work in the with
statement. Simple to implement, simple syntax, works pretty well. The class will look like this:
import threading
class Blinky(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True
self._finished = False
def __enter__(self):
self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def run(self):
# turn light on
while not self._finished:
time.sleep(.5)
# turn light off
def stop(self):
self._finished = True
Then, to run your function, you simply put:
with Blinky():
my_function()
The light should turn on once the with
statement is reached and turn off up to a half second after the context of the with
is exited.
Upvotes: 2
Reputation: 386
In while condition put true and in while loop put if statement which will check if your function return any value if return write break
Upvotes: 0