Reputation: 1
I'm currently running into a problem with trying to write to a serial device using pySerial. I want to be able to continuously update my terminal by reading the port and handle serial device writing on a seperate thread, meanwhile also be able to send a command via user input on the main thread. Everything runs as expected, except for that when I send one of the commands (cmdA or cmdB), the serial's output that I'm reading does not change (this is expected behaviour as the commands being sent alter the state of the device, which in turn changes the device's output that the serial port is reading). With all that said, it seems that the device is not receiving the command I am sending, even though the code continues to run with no exception and all functions seem to be executing as written.
Here is my current code:
A SerialMonitor class that can read the serial port and print out a specific amount of bytes once finding a set of "syncbytes"
# SerialMonitorTool.py
import threading
import time
import serial
class SerialMonitor(threading.Thread):
SYNC_BYTES = b'\x90\xeb'
def __init__(self, device='/dev/ttyUSB0', baudrate=115200, timeout=5):
print("Initializing Serial Monitor")
self._running = False
self._name = 'SerialMonitorThread-{}'.format(device)
self._device = serial.Serial(device, baudrate=baudrate, timeout=timeout)
self._write_lock = threading.Lock()
super().__init__(name=self._name)
def write(self, user_input, encode=False, terminator=None):
print("Locking for CMD Write...")
self._write_lock.acquire()
tx = user_input + terminator if terminator else user_input
print(f"Writing CMD to device: {tx}")
self._device.write(tx.encode() if encode else tx)
print("CMD Written...")
self._write_lock.release()
print("CMD Write Lock Released...")
def stop(self):
self._running = False
print('stop thread: ' + threading.current_thread().getName())
self.join()
def run(self):
print('starting thread: ' + threading.current_thread().getName())
self._running = True
try:
while self._running:
self._device.reset_input_buffer()
self._device.read_until(self.SYNC_BYTES)
ser_bytes = self._device.read(35)
print(f'\r{ser_bytes}', end='', flush=True)
time.sleep(0.25)
finally:
self._device.close()
and the main thread
# SerialMain.py
from SerialMonitorTool import *
cmdA = b'\x90\xeb\x01'
cmdB = b'\x90\xeb\x02'
monitor: SerialMonitor()
def print_help():
print('Usage: cmd [ a | b ]')
def send_cmd(cmd):
monitor.write(cmd)
def main():
monitor.start()
while True:
try:
user_input = input()
if user_input == '?' or user_input == 'h' or user_input == 'help':
print_help()
elif user_input == 'q' or user_input == 'quit':
break
elif user_input.startswith('cmd '):
cmd_type = user_input[len('cmd '):].split(' ')
if cmd_type[0] == 'a':
send_cmd(cmdA)
elif cmd_type[0] == 'b':
send_cmd(cmdB)
except Exception as e:
print(e)
monitor.stop()
def process_args():
# process arguments
import argparse
parser = argparse.ArgumentParser(description='Serial Test Tool')
parser.add_argument(
'-D', '--device',
help='Use the specified serial device.',
default='/dev/ttyUSB0',
type=str
)
global monitor
monitor = SerialMonitor()
if __name__ == "__main__":
process_args()
main()
Upvotes: 0
Views: 2845
Reputation: 34
It looks like there is issue in your write method, try to comment all the lock related code in write method or put lock syntax in below sequence.
def write(self, user_input, encode=False, terminator=None):
tx = user_input + terminator if terminator else user_input
print(f"Writing CMD to device: {tx}")
self._device.write(tx.encode() if encode else tx)
print("CMD Written...")
print("Locking for CMD Write...")
self._write_lock.acquire()
self._write_lock.release()
print("CMD Write Lock Released...")
Upvotes: 0