bsteur
bsteur

Reputation: 11

Multiple Gamepads with Python Evdev?

I found a way to use a gamepad in Python, using the module Evdev (see link at the end). In this tutorial the author only uses one gamepad, but he also states, that it should be possible to use multiple gamespads with code basing on the following:

from evdev import InputDevice
from select import select
gamepad = InputDevice('/dev/input/event0')
while True:
    r,w,x = select([gamepad], [], [])
    for event in gamepad.read():
        print(event)

Select.select seems to wait until a button is pressed, so that the program is interupted until that happens. How can I modify the code to use multiple gamepads or to execute other code while waiting for button-inputs? Or is there a better alternative to using evdev for that matter?

http://ericgoebelbecker.com/2015/06/raspberry-pi-and-gamepad-programming-part-1-reading-the-device/

Upvotes: 1

Views: 1456

Answers (2)

Mario Mey
Mario Mey

Reputation: 1702

I also had this need (I have two BT gamepads). So, I wrote my code that does:

  • Creates a thread (threading.Thread()) that checks every second if they are connected to preset event paths /dev/input/event* (16 and 20 are defaults, they can be changed). If it doesn't find them, it tries the next event (eg: 17/21). If it find one or both, it connects it/them. If not, it keep on checking every second.
  • For connecting more than one gamepad, it uses selector as docs says.

You can see my code here, but I suggest you to go to my Github for updates.

Click to expand
#!/usr/bin/env python3

import time, argparse, subprocess
try:
    from obs_api import client, consola, c2, c3, c, thread
except:
    from obs_api_no_obs import client, consola, c2, c3, c, thread
    import sys

from evdev import InputDevice, ecodes
from selectors import DefaultSelector, EVENT_READ

class Bt:
    def __init__(self):
        self.but = [307, 308, 305, 304, 315]
        self.gamepad1, self.gamepad2 = None, None
        self.selector = DefaultSelector()
        self.devices_list = list()
        self.devices_dict = dict()
        self.bt_on = True

    def bt_send_hat(self, path, que, val):
        client.send_message('/bt', [int(path[-2:]), que, val])
        c2(f'/bt, {int(path[-2:])}, {que}, {val}')
        if val == 0: self.devices_dict[path] = 'c'
        else:        self.devices_dict[path] = que

    def bt_send(self, path, que, val):
        client.send_message('/bt', [int(path[-2:]), que, val])
        c2(f'/bt, {int(path[-2:])}, {que}, {val}')

    def reconnect(self):
        device1 = '13:57:90:05:0E:31'
        device2 = '13:6E:0E:07:0E:31'
        ps1 = subprocess.Popen(['bluetoothctl', 'info', device1], stdout=subprocess.PIPE)
        ps2 = subprocess.Popen(['bluetoothctl', 'info', device2], stdout=subprocess.PIPE)
        stdout1 = subprocess.check_output(['grep', 'Connected'], stdin=ps1.stdout).decode("utf-8")
        stdout2 = subprocess.check_output(['grep', 'Connected'], stdin=ps2.stdout).decode("utf-8")
        
        if 'No' in stdout1 or 'no' in stdout1:
            subprocess.Popen(['bluetoothctl', 'connect', device1])
            c3(f'bluetoothctl connect {device1}')
        
        if 'No' in stdout2 or 'no' in stdout2:
            subprocess.Popen(['bluetoothctl', 'connect', device2])
            c3(f'bluetoothctl connect {device2}')

    def is_none(self, num, dev):
        gamepad = f'gamepad{num}'
        device = f'/dev/input/event{dev}'
        try:
            vars(self)[gamepad] = InputDevice(device)
            try:
                self.selector.unregister(vars(self)[gamepad])
            except:
                c3(f'Todavía no registrado {device}', c.azul)
            
            try:
                     self.selector.register(vars(self)[gamepad], EVENT_READ)
                     c3(f'Registrado {device}', c.cian)
            except:
                c3(f'{device} already registred', c.cian)

        except OSError as e:
            c3(f'No está conectado {device}')

            # Probando device + 1
            dev += 1
            device = f'/dev/input/event{dev}'
            try:
                vars(self)[gamepad] = InputDevice(device)
                try:     self.selector.unregister(vars(self)[gamepad])
                except:  c3(f'Todavía no registrado {device}', c.azul)
                try:
                        self.selector.register(vars(self)[gamepad], EVENT_READ)
                        c3(f'Registrado {device}', c.cian)
                except:  c3(f'{device} already registred', c.cian)
            except OSError as e:
                c3(f'Ni tampoco...     {device}')

    def check_devices(self):
        while self.bt_on:
            # Si no están cargados, los intenta cargar y registrarlos en selector
            if self.gamepad1 is None:
                self.is_none(1, self.devices_list[0])
            if self.gamepad2 is None:
                self.is_none(2, self.devices_list[1])
            time.sleep(1)

    def input_bt(self, gp1, gp2):
        self.devices_list = [gp1, gp2]
        self.devices_dict= {f'/dev/input/event{gp1}':'c',
                            f'/dev/input/event{gp2}':'c'}
        
        client.send_message('/bt_init', [gp1, gp2])

        thread(self.check_devices)
        
        time.sleep(2)
        while self.bt_on:
            
            # Si ninguno de los dos está cargado, vuelve a intentar conectarlos
            if self.gamepad1 is None and self.gamepad2 is None:
                c3('No está conectado ninguno')
                time.sleep(1)
                continue

            # Revisa la lista de selector, esperando que llegue algo
            for key, mask in self.selector.select():
                device = key.fileobj
                path   = key.fileobj.path
                
                # Intenta leer en device. Si salta error...
                try:
                    for event in device.read():
                        et, ec, ev = event.type, event.code, event.value
                        if et == ecodes.EV_ABS:
                            # Analogo
                            if ec == 1: self.bt_send(path, 'h', -ev)
                            if ec == 0: self.bt_send(path, 'v', -ev)
                            if   ec == 16 and ev == -1: self.bt_send_hat(path, 't', 1)
                            elif ec == 16 and ev ==  1: self.bt_send_hat(path, 'b', 1)
                            elif ec == 17 and ev == -1: self.bt_send_hat(path, 'r', 1)
                            elif ec == 17 and ev ==  1: self.bt_send_hat(path, 'l', 1)
                            if   ec == 1  and ev ==  0: self.bt_send_hat(path, 'r', 0)
                            if   ec == 1  and ev ==  0: self.bt_send_hat(path, 'l', 0)
                            if   ec == 0  and ev ==  0: self.bt_send_hat(path, 't', 0)
                            if   ec == 0  and ev ==  0: self.bt_send_hat(path, 'b', 0)
                            
                        if et == ecodes.EV_KEY:
                            if   ec == self.but[0]: self.bt_send(path, 0, ev)
                            elif ec == self.but[1]: self.bt_send(path, 1, ev)
                            elif ec == self.but[2]: self.bt_send(path, 2, ev)
                            elif ec == self.but[3]: self.bt_send(path, 3, ev)
                            elif ec == self.but[4]: self.bt_send(path, 4, ev)
                
                # ... es porque el gamepad se apagó. Lo cierra y lo desregistra de selector
                except OSError as e:
                    device.close()
                    c3('input_bt() - Except - Se apagó un gamepad')
                    
                    if path[-2:] == '16':
                        c3(f'¿Se apagó /dev/input/event{self.devices_list[0]}? Desregistrándolo...')
                        if self.gamepad1 != None:
                            self.selector.unregister(self.gamepad1)
                            self.gamepad1 = None
                    
                    if path[-2:] == '20':
                        c3(f'¿Se apagó /dev/input/event{self.devices_list[1]}? Desregistrándolo...')
                        if self.gamepad2 != None:
                            self.selector.unregister(self.gamepad2)
                            self.gamepad2 = None
            
            # c4('input_bt() Fin de WHILE')


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--gp1', type=int, default=16,help='Gamepad 1')
    parser.add_argument('--gp2', type=int, default=20,help='Gamepad 2')
    args = parser.parse_args()
    
    bt = Bt()

    try:
        consola(f'"R": reconnect()', c.naranja)
        consola(f'"Q": quit', c.naranja)

        thread(bt.input_bt, [args.gp1, args.gp2])

        while True:
            tecla = input()
            if tecla == 'r':
                bt.reconnect()
            elif tecla == 'q':
                sys.exit()
    except KeyboardInterrupt:
        print(' Bye')

Also, this is the module that loads if you are not me 😁.

Click to expand
#!/usr/bin/env python3

import random, threading

from pythonosc import udp_client
targetIp = "127.0.0.1"
targetPort = 10000
client = udp_client.SimpleUDPClient(targetIp, targetPort)
client.send_message("/init", 1)

class Gd:
    def __init__(self) -> None:
        self.gd = {'verbose': [True, True, True, True]}

globalDict = Gd()

class Color:
    def __init__(self):
        self.reset = '\x1b[0m'
        self.blanco = '\x1b[97m'
        self.negro = '\x1b[90m'
        self.rojo = '\x1b[91m'
        self.verde = '\x1b[92m'
        self.azul = '\x1b[94m'
        self.amarillo = '\x1b[93m'
        self.magenta = '\x1b[95m'
        self.magenta_bold = '\x1b[95;1m'
        self.azul_bold = '\x1b[94;1m'
        self.cian = '\x1b[96m'
        self.naranja = '\x1b[38;5;202m'
        self.violeta = '\x1b[38;5;129m'
        self.rosa = '\x1b[38;5;213m'
        self.ocre = '\x1b[38;5;172m'
        self.marron = '\x1b[38;5;52m'
        self.musgo = '\x1b[38;5;58m'
        self.error = '\x1b[93;41m'
        self.remoto = '\x1b[93;42m'
        self.debug = '\x1b[93;44m'
        self.lista_attrs = []
        self.attrs = self.__dict__
        
        for k, v in self.attrs.items():
            if k not in ['lista_attrs', 'attrs', 'random']:
                self.lista_attrs.append(v)
    
        self.random = random.choice(self.lista_attrs)
c = Color()

# Threading
def thread(function, args=[]):
    t = threading.Thread(
        target=function,
        args=(args),
        name=f'{function}({args})',
        daemon=True)
    t.start()

def c1(texto, color_texto=c.azul_bold):
    if globalDict.gd['verbose'][0]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def c2(texto, color_texto=c.azul):
    if globalDict.gd['verbose'][1]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def c3(texto, color_texto=c.cian):
    if globalDict.gd['verbose'][2]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def c4(texto, color_texto=c.rosa):
    if globalDict.gd['verbose'][3]:
        texto = str(texto)
        print(color_texto, texto, c.reset)

def consola(texto, color_texto=c.verde):
    texto = str(texto)
    print(color_texto, texto, c.reset)

Upvotes: 0

nondebug
nondebug

Reputation: 1011

How can I modify the code to use multiple gamepads or to execute other code while waiting for button-inputs?

Check out the documentation for InputDevice.read

read()
Read multiple input events from device. Return a generator object that yields InputEvent instances. Raises BlockingIOError if there are no available events at the moment.

Select will block until an input event is available. Instead, we can read events until we get BlockingIOError. And then continue to the next gamepad, or do any other work that needs to be done in the main loop.

You may also consider using InputDevice.read_one

read_one()
Read and return a single input event as an instance of InputEvent.

Return None if there are no pending input events.

Upvotes: 1

Related Questions