Reputation: 1
I am writing a code to control a signal using the NI-DAQmx Python API, to communicate with NI's USB-6314. I want to be able to write a signal (a sine wave for example), but, if needed, be able to stop the writing and start a new function (basically update a function being output on the fly).
To figure this out, I have tried implementing threading with the following code:
def test_stop(task):
print("stopping in 2 seconds")
time.sleep(2)
print("attempting to stop")
task.stop()
print("task has stopped")
device = "SimDev"
print("Signal parameters:\
\nAmplitude: {}\
\nFrequency: {}\
\nDuration: {}\
".format(util.ampl,
util.freq,
util.duration))
dt, signal = util.fct1(util.ampl, util.freq, util.duration, util.sampling_rate)
#WRITE in thread - STOP in main
start_time = time.time()
print("\n=================================\nSTOP in thread - WRITE in main")
with nidaqmx.Task() as task:
task.ao_channels.add_ao_voltage_chan(device+"/ao0")
task.timing.cfg_samp_clk_timing(util.sampling_rate)
task.start()
thread1 = threading.Thread(target=task.write, args=(signal,))
thread1.start()
print("thread start")
test_stop(task)
stop_time = time.time()
print("\nOverall time: {:.2f}s\
\nSignal duration: {}s\
".format(stop_time-start_time,util.duration))
With this code, I would expect the overall time to be 2 seconds (due to the time.sleep(2)), but I get 10 seconds (=util.duration), showing that the signal is not stopped mid-task.
Any help would be greatly appreciated!
Upvotes: 0
Views: 93