Falko
Falko

Reputation: 17867

How to write a Python terminal application with a fixed input line?

I'm trying to write a terminal application to interact with an Arduino microcontroller via pyserial. The following features are important:

In principle, this should be possible with cmd. But I'm struggling with printing incoming messages, when the user started typing.

For simplicity, I wrote the following test script emulating incoming messages every second. Outgoing messages are just echoed back to the command line with the prefix ">":

#!/usr/bin/env python3
from cmd import Cmd
from threading import Thread
import time

class Prompt(Cmd):

    def default(self, inp):

        print('>', inp)

stop = False

def echo():
    
    while not stop:
        
        print(time.time())
        time.sleep(1)

thread = Thread(target=echo)
thread.daemon = True
thread.start()

try:
    Prompt().cmdloop()
except KeyboardInterrupt:
    stop = True
    thread.join()

In Spyder IDE, the result is just perfect:

Result in Spyder IDE

But in iterm2 (Mac OS) the output is pretty messed up:

Result in iterm2

Since I want to use this application from within Visual Studio Code, it should work outside Spyder. Do you have any idea how to get the same behaviour in iterm2 as in Spyder?

Things I already considered or tried out:

Upvotes: 1

Views: 1507

Answers (2)

Falko
Falko

Reputation: 17867

Yes! I found a solution: The Prompt Toolkit 3.0 in combination with asyncio lets you handle this very problem using patch_stdout, "a context manager that ensures that print statements within it won’t destroy the user interface".

Here is a minimum working example:

#!/usr/bin/env python3
from prompt_toolkit import PromptSession
from prompt_toolkit.patch_stdout import patch_stdout
import asyncio
import time

async def echo():

    while True:
        
        print(time.time())
        await asyncio.sleep(1)

async def read():

    session = PromptSession()

    while True:
 
        with patch_stdout():
            line = await session.prompt_async("> ")
            print(line.upper())

loop = asyncio.get_event_loop()
loop.create_task(echo())
loop.create_task(read())
loop.run_forever()

screencast

Upvotes: 1

Rob Raymond
Rob Raymond

Reputation: 31156

It's a while since I was interacting with an Arduino with my Mac. I used pyserial and it was 100% reliable. key is user read_until(). I've included my wrapper class for illustration. (Also has an emulation mode for when I didn't have a Arduino)

import serial # pip install PySerial
from serial.tools import list_ports
import pty, os # for creating virtual serial interface
from serial import Serial
from typing import Optional


class SerialInterface:
    # define constants which control how class works
    FULLEMULATION=0
    SERIALEMULATION=1
    URLEMULATION=2
    FULLSOLUTION=3

    # define private class level variables
    __emulate:int = FULLEMULATION
    __ser:Serial
    __port:str = ""


    def __init__(self, emulate:int=FULLEMULATION, port:str="") -> None:
        self.__buffer:list = []
        self.__emulate = emulate
        self.__port = port

        #self.listports()

        # setup connection to COM/serial port
        # emulation sets up a virtual port,  but this has not been working
        if emulate == self.FULLSOLUTION:
            self.__ser = serial.Serial(port, 9600)
        elif emulate == self.SERIALEMULATION:
            master, slave = pty.openpty()
            serialport = os.ttyname(slave)

            self.__ser = serial.Serial(port=serialport, baudrate=9600, timeout=1)
        elif emulate == self.URLEMULATION:
            self.__ser = serial.serial_for_url("loop://")


    # useful to show COM/serial ports on a computer
    @staticmethod
    def listports() -> list:
        for p in serial.tools.list_ports.comports():
            print(p, p.device)
            serialport = p.device
        return serial.tools.list_ports.comports()

    def read_until(self, expected:bytes=b'\n', size:Optional[int]=None) -> bytes:
        if self.__emulate == self.FULLEMULATION:
            return self.__buffer.pop()
        else:
            return self.__ser.read_until(expected, size)

    # note it is important to have \n on end of every write to allow data to be read item by item
    def write(self, bytes:bytes=b'') -> None:
        if self.__emulate == self.FULLEMULATION:
            self.__buffer.append(bytes)
        else:
            self.__ser.write(bytes)

    def dataAvail(self) -> bool:
        if self.__emulate == self.FULLEMULATION:
            return len(self.__buffer) > 0
        else:
            return self.__ser.inWaiting() > 0

    def close(self) -> None:
        self.__ser.close()

    def mode(self) -> int:
        return self.__emulate




Upvotes: 0

Related Questions