Reputation: 17867
I'm trying to write a terminal application to interact with an Arduino microcontroller via pyserial. The following features are important:
In principle, this should be possible with cmd. But I'm struggling with printing incoming messages, when the user started typing.
For simplicity, I wrote the following test script emulating incoming messages every second. Outgoing messages are just echoed back to the command line with the prefix ">":
#!/usr/bin/env python3
from cmd import Cmd
from threading import Thread
import time
class Prompt(Cmd):
def default(self, inp):
print('>', inp)
stop = False
def echo():
while not stop:
print(time.time())
time.sleep(1)
thread = Thread(target=echo)
thread.daemon = True
thread.start()
try:
Prompt().cmdloop()
except KeyboardInterrupt:
stop = True
thread.join()
In Spyder IDE, the result is just perfect:
But in iterm2 (Mac OS) the output is pretty messed up:
Since I want to use this application from within Visual Studio Code, it should work outside Spyder. Do you have any idea how to get the same behaviour in iterm2 as in Spyder?
Things I already considered or tried out:
Use the curses library. This solves my problem of printing text to different regions. But I'm loosing endless scrolling, since curses defines its own fullscreen window.
Move the cursor using ansi escape sequences. It might be a possible solution, but I'm just not getting it to work. It always destroys the bottom line where the user is typing. I might need to adjust the scrolling region, which I still didn't manage to do.
Use a different interpreter. I already tried Python vs. iPython, without success. It might be a more subtle setting in Spyder's interpreter.
Upvotes: 1
Views: 1507
Reputation: 17867
Yes! I found a solution: The Prompt Toolkit 3.0 in combination with asyncio lets you handle this very problem using patch_stdout
, "a context manager that ensures that print statements within it won’t destroy the user interface".
Here is a minimum working example:
#!/usr/bin/env python3
from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout
import asyncio
import time
async def echo():
while True:
print(time.time())
await asyncio.sleep(1)
async def read():
session = PromptSession()
while True:
with patch_stdout():
line = await session.prompt_async("> ")
print(line.upper())
loop = asyncio.get_event_loop()
loop.create_task(echo())
loop.create_task(read())
loop.run_forever()
Upvotes: 1
Reputation: 31156
It's a while since I was interacting with an Arduino with my Mac. I used pyserial
and it was 100% reliable. key is user read_until()
. I've included my wrapper class for illustration. (Also has an emulation mode for when I didn't have a Arduino)
import serial # pip install PySerial
from serial.tools import list_ports
import pty, os # for creating virtual serial interface
from serial import Serial
from typing import Optional
class SerialInterface:
# define constants which control how class works
FULLEMULATION=0
SERIALEMULATION=1
URLEMULATION=2
FULLSOLUTION=3
# define private class level variables
__emulate:int = FULLEMULATION
__ser:Serial
__port:str = ""
def __init__(self, emulate:int=FULLEMULATION, port:str="") -> None:
self.__buffer:list = []
self.__emulate = emulate
self.__port = port
#self.listports()
# setup connection to COM/serial port
# emulation sets up a virtual port, but this has not been working
if emulate == self.FULLSOLUTION:
self.__ser = serial.Serial(port, 9600)
elif emulate == self.SERIALEMULATION:
master, slave = pty.openpty()
serialport = os.ttyname(slave)
self.__ser = serial.Serial(port=serialport, baudrate=9600, timeout=1)
elif emulate == self.URLEMULATION:
self.__ser = serial.serial_for_url("loop://")
# useful to show COM/serial ports on a computer
@staticmethod
def listports() -> list:
for p in serial.tools.list_ports.comports():
print(p, p.device)
serialport = p.device
return serial.tools.list_ports.comports()
def read_until(self, expected:bytes=b'\n', size:Optional[int]=None) -> bytes:
if self.__emulate == self.FULLEMULATION:
return self.__buffer.pop()
else:
return self.__ser.read_until(expected, size)
# note it is important to have \n on end of every write to allow data to be read item by item
def write(self, bytes:bytes=b'') -> None:
if self.__emulate == self.FULLEMULATION:
self.__buffer.append(bytes)
else:
self.__ser.write(bytes)
def dataAvail(self) -> bool:
if self.__emulate == self.FULLEMULATION:
return len(self.__buffer) > 0
else:
return self.__ser.inWaiting() > 0
def close(self) -> None:
self.__ser.close()
def mode(self) -> int:
return self.__emulate
Upvotes: 0