Reputation: 10101
I'm reading serial data with a while loop. However, I have no control over the sample rate.
The code itself seems to take 0.2s to run, so I know I won't be able to go any faster than that. But I would like to be able to control precisely how much slower I sample.
I feel like I could do it using 'sleep', but the problem is that there is potential that at different points the loop itself will take longer to read(depending on precisely what is being transmitted over serial data), so the code would have to make up the balance.
For example, let's say I want to sample every 1s, and the loop takes anywhere from 0.2s to 0.3s to run. My code needs to be smart enough to sleep for 0.8s (if the loop takes 0.2s) or 0.7s (if the loop takes 0.3s).
import serial
import csv
import time
#open serial stream
while True:
#read and print a line
sample_value=ser.readline()
sample_time=time.time()-zero
sample_line=str(sample_time)+','+str(sample_value)
outfile.write(sample_line)
print 'time: ',sample_time,', value: ',sample_value
Upvotes: 5
Views: 8977
Reputation: 7011
The trouble with approaches like this:
start = time.time()
sample()
end = time.time()
sleep(sample_period - (end - start))
is that minor errors (e.g. from time between measuring end
and the sleep
call, or from sleep
staying asleep longer than requested) accumulate over time.
An IMO better approach is to work out when the next sample time should be, and sleep until then:
next_sample_time = time.time()
while True:
next_sample_time += sample_period
time.sleep(next_sample_time - time.time())
sample()
Since the next_sample_time
calculation is not based on time.time()
at all, our error will only be in how accurately we hit that time, and won't accumulate over loop iterations.
One potential bad behaviour with this code is that if some samples are slower than the sampling interval, or the code is suspended and then resumed, it'll accumulate a "backlog" of elapsed sample times, and then sample them all as quickly as possible until it catches up. If you'd prefer to skip sampling in these cases, you can do something like this:
next_sample_time = time.time()
while True:
next_sample_time += sample_period
time_to_sleep = next_sample_time - time.time()
if time_to_sleep < 0:
continue
time.sleep(time_to_sleep)
sample()
or, if the backlog somehow gets really large and you need to clear it more efficiently:
next_sample_time = time.time()
while True:
next_sample_time += sample_period
time_to_sleep = next_sample_time - time.time()
samples_to_skip = math.ceil(-time_to_sleep / sample_period)
if samples_to_skip > 0:
next_sample_time += samples_to_skip * sample_period
time_to_sleep = next_sample_time - time.time()
time.sleep(time_to_sleep)
sample()
(though I haven't tested this and I think the simpler thing is probably usually better).
Upvotes: 0
Reputation: 1139
In asyncio, I had good results using this:
import asyncio
import time
async def precise_iteration_frequency(frequency: float) -> AsyncIterator[None]:
"""A generator to iterate over in a fixed frequency.
asyncio.sleep might end up sleeping too long, for whatever reason. Maybe there are
other async function calls that take longer than expected in the background.
"""
sleep = 1 / frequency
corrected_sleep = sleep
error = 0
while True:
start = time.time()
yield
corrected_sleep -= error
await asyncio.sleep(corrected_sleep)
error = (time.time() - start) - sleep
for example:
async def example():
async for _ in precise_iteration_frequency(60):
print("This is printed 60 times per second")
Upvotes: 0
Reputation: 77157
At the beginning of the loop check if the appropriate amount of time has passed. If it has not, sleep
.
# Set up initial conditions for sample_time outside the loop
sample_period = ???
next_min_time = 0
while True:
sample_time = time.time() - zero
if sample_time < next_min_time:
time.sleep(next_min_time - sample_time)
continue
# read and print a line
sample_value = ser.readline()
sample_line = str(sample_time)+','+str(sample_value)
outfile.write(sample_line)
print 'time: {}, value: {}'.format(sample_time, sample_value)
next_min_time = sample_time + sample_period
Upvotes: 0
Reputation: 11012
An rather elegant method is you're working on UNIX : use the signal library
The code :
import signal
def _handle_timeout():
print "timeout hit" # Do nothing here
def second(count):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(1)
try:
count += 1 # put your function here
signal.pause()
finally:
signal.alarm(0)
return count
if __name__ == '__main__':
count = 0
count = second(count)
count = second(count)
count = second(count)
count = second(count)
count = second(count)
print count
And the timing :
georgesl@cleese:~/Bureau$ time python timer.py
5
real 0m5.081s
user 0m0.068s
sys 0m0.004s
Two caveats though : it only works on *nix, and it is not multithread-safe.
Upvotes: 2
Reputation: 101142
Just measure the time running your code takes every iteration of the loop, and sleep
accordingly:
import time
while True:
now = time.time() # get the time
do_something() # do your stuff
elapsed = time.time() - now # how long was it running?
time.sleep(1.-elapsed) # sleep accordingly so the full iteration takes 1 second
Of course not 100% perfect (maybe off one millisecond or another from time to time), but I guess it's good enough.
Another nice approach is using twisted's LoopingCall
:
from twisted.internet import task
from twisted.internet import reactor
def do_something():
pass # do your work here
task.LoopingCall(do_something).start(1.0)
reactor.run()
Upvotes: 11