Reputation: 11
I found a way to use a gamepad in Python, using the module Evdev (see link at the end). In this tutorial the author only uses one gamepad, but he also states, that it should be possible to use multiple gamespads with code basing on the following:
from evdev import InputDevice
from select import select
gamepad = InputDevice('/dev/input/event0')
while True:
r,w,x = select([gamepad], [], [])
for event in gamepad.read():
print(event)
Select.select seems to wait until a button is pressed, so that the program is interupted until that happens. How can I modify the code to use multiple gamepads or to execute other code while waiting for button-inputs? Or is there a better alternative to using evdev for that matter?
http://ericgoebelbecker.com/2015/06/raspberry-pi-and-gamepad-programming-part-1-reading-the-device/
Upvotes: 1
Views: 1456
Reputation: 1702
I also had this need (I have two BT gamepads). So, I wrote my code that does:
threading.Thread()
) that checks every second if they are connected to preset event paths /dev/input/event*
(16 and 20 are defaults, they can be changed). If it doesn't find them, it tries the next event (eg: 17/21). If it find one or both, it connects it/them. If not, it keep on checking every second.selector
as docs says.You can see my code here, but I suggest you to go to my Github for updates.
Click to expand#!/usr/bin/env python3
import time, argparse, subprocess
try:
from obs_api import client, consola, c2, c3, c, thread
except:
from obs_api_no_obs import client, consola, c2, c3, c, thread
import sys
from evdev import InputDevice, ecodes
from selectors import DefaultSelector, EVENT_READ
class Bt:
def __init__(self):
self.but = [307, 308, 305, 304, 315]
self.gamepad1, self.gamepad2 = None, None
self.selector = DefaultSelector()
self.devices_list = list()
self.devices_dict = dict()
self.bt_on = True
def bt_send_hat(self, path, que, val):
client.send_message('/bt', [int(path[-2:]), que, val])
c2(f'/bt, {int(path[-2:])}, {que}, {val}')
if val == 0: self.devices_dict[path] = 'c'
else: self.devices_dict[path] = que
def bt_send(self, path, que, val):
client.send_message('/bt', [int(path[-2:]), que, val])
c2(f'/bt, {int(path[-2:])}, {que}, {val}')
def reconnect(self):
device1 = '13:57:90:05:0E:31'
device2 = '13:6E:0E:07:0E:31'
ps1 = subprocess.Popen(['bluetoothctl', 'info', device1], stdout=subprocess.PIPE)
ps2 = subprocess.Popen(['bluetoothctl', 'info', device2], stdout=subprocess.PIPE)
stdout1 = subprocess.check_output(['grep', 'Connected'], stdin=ps1.stdout).decode("utf-8")
stdout2 = subprocess.check_output(['grep', 'Connected'], stdin=ps2.stdout).decode("utf-8")
if 'No' in stdout1 or 'no' in stdout1:
subprocess.Popen(['bluetoothctl', 'connect', device1])
c3(f'bluetoothctl connect {device1}')
if 'No' in stdout2 or 'no' in stdout2:
subprocess.Popen(['bluetoothctl', 'connect', device2])
c3(f'bluetoothctl connect {device2}')
def is_none(self, num, dev):
gamepad = f'gamepad{num}'
device = f'/dev/input/event{dev}'
try:
vars(self)[gamepad] = InputDevice(device)
try:
self.selector.unregister(vars(self)[gamepad])
except:
c3(f'Todavía no registrado {device}', c.azul)
try:
self.selector.register(vars(self)[gamepad], EVENT_READ)
c3(f'Registrado {device}', c.cian)
except:
c3(f'{device} already registred', c.cian)
except OSError as e:
c3(f'No está conectado {device}')
# Probando device + 1
dev += 1
device = f'/dev/input/event{dev}'
try:
vars(self)[gamepad] = InputDevice(device)
try: self.selector.unregister(vars(self)[gamepad])
except: c3(f'Todavía no registrado {device}', c.azul)
try:
self.selector.register(vars(self)[gamepad], EVENT_READ)
c3(f'Registrado {device}', c.cian)
except: c3(f'{device} already registred', c.cian)
except OSError as e:
c3(f'Ni tampoco... {device}')
def check_devices(self):
while self.bt_on:
# Si no están cargados, los intenta cargar y registrarlos en selector
if self.gamepad1 is None:
self.is_none(1, self.devices_list[0])
if self.gamepad2 is None:
self.is_none(2, self.devices_list[1])
time.sleep(1)
def input_bt(self, gp1, gp2):
self.devices_list = [gp1, gp2]
self.devices_dict= {f'/dev/input/event{gp1}':'c',
f'/dev/input/event{gp2}':'c'}
client.send_message('/bt_init', [gp1, gp2])
thread(self.check_devices)
time.sleep(2)
while self.bt_on:
# Si ninguno de los dos está cargado, vuelve a intentar conectarlos
if self.gamepad1 is None and self.gamepad2 is None:
c3('No está conectado ninguno')
time.sleep(1)
continue
# Revisa la lista de selector, esperando que llegue algo
for key, mask in self.selector.select():
device = key.fileobj
path = key.fileobj.path
# Intenta leer en device. Si salta error...
try:
for event in device.read():
et, ec, ev = event.type, event.code, event.value
if et == ecodes.EV_ABS:
# Analogo
if ec == 1: self.bt_send(path, 'h', -ev)
if ec == 0: self.bt_send(path, 'v', -ev)
if ec == 16 and ev == -1: self.bt_send_hat(path, 't', 1)
elif ec == 16 and ev == 1: self.bt_send_hat(path, 'b', 1)
elif ec == 17 and ev == -1: self.bt_send_hat(path, 'r', 1)
elif ec == 17 and ev == 1: self.bt_send_hat(path, 'l', 1)
if ec == 1 and ev == 0: self.bt_send_hat(path, 'r', 0)
if ec == 1 and ev == 0: self.bt_send_hat(path, 'l', 0)
if ec == 0 and ev == 0: self.bt_send_hat(path, 't', 0)
if ec == 0 and ev == 0: self.bt_send_hat(path, 'b', 0)
if et == ecodes.EV_KEY:
if ec == self.but[0]: self.bt_send(path, 0, ev)
elif ec == self.but[1]: self.bt_send(path, 1, ev)
elif ec == self.but[2]: self.bt_send(path, 2, ev)
elif ec == self.but[3]: self.bt_send(path, 3, ev)
elif ec == self.but[4]: self.bt_send(path, 4, ev)
# ... es porque el gamepad se apagó. Lo cierra y lo desregistra de selector
except OSError as e:
device.close()
c3('input_bt() - Except - Se apagó un gamepad')
if path[-2:] == '16':
c3(f'¿Se apagó /dev/input/event{self.devices_list[0]}? Desregistrándolo...')
if self.gamepad1 != None:
self.selector.unregister(self.gamepad1)
self.gamepad1 = None
if path[-2:] == '20':
c3(f'¿Se apagó /dev/input/event{self.devices_list[1]}? Desregistrándolo...')
if self.gamepad2 != None:
self.selector.unregister(self.gamepad2)
self.gamepad2 = None
# c4('input_bt() Fin de WHILE')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--gp1', type=int, default=16,help='Gamepad 1')
parser.add_argument('--gp2', type=int, default=20,help='Gamepad 2')
args = parser.parse_args()
bt = Bt()
try:
consola(f'"R": reconnect()', c.naranja)
consola(f'"Q": quit', c.naranja)
thread(bt.input_bt, [args.gp1, args.gp2])
while True:
tecla = input()
if tecla == 'r':
bt.reconnect()
elif tecla == 'q':
sys.exit()
except KeyboardInterrupt:
print(' Bye')
Also, this is the module that loads if you are not me 😁.
Click to expand#!/usr/bin/env python3
import random, threading
from pythonosc import udp_client
targetIp = "127.0.0.1"
targetPort = 10000
client = udp_client.SimpleUDPClient(targetIp, targetPort)
client.send_message("/init", 1)
class Gd:
def __init__(self) -> None:
self.gd = {'verbose': [True, True, True, True]}
globalDict = Gd()
class Color:
def __init__(self):
self.reset = '\x1b[0m'
self.blanco = '\x1b[97m'
self.negro = '\x1b[90m'
self.rojo = '\x1b[91m'
self.verde = '\x1b[92m'
self.azul = '\x1b[94m'
self.amarillo = '\x1b[93m'
self.magenta = '\x1b[95m'
self.magenta_bold = '\x1b[95;1m'
self.azul_bold = '\x1b[94;1m'
self.cian = '\x1b[96m'
self.naranja = '\x1b[38;5;202m'
self.violeta = '\x1b[38;5;129m'
self.rosa = '\x1b[38;5;213m'
self.ocre = '\x1b[38;5;172m'
self.marron = '\x1b[38;5;52m'
self.musgo = '\x1b[38;5;58m'
self.error = '\x1b[93;41m'
self.remoto = '\x1b[93;42m'
self.debug = '\x1b[93;44m'
self.lista_attrs = []
self.attrs = self.__dict__
for k, v in self.attrs.items():
if k not in ['lista_attrs', 'attrs', 'random']:
self.lista_attrs.append(v)
self.random = random.choice(self.lista_attrs)
c = Color()
# Threading
def thread(function, args=[]):
t = threading.Thread(
target=function,
args=(args),
name=f'{function}({args})',
daemon=True)
t.start()
def c1(texto, color_texto=c.azul_bold):
if globalDict.gd['verbose'][0]:
texto = str(texto)
print(color_texto, texto, c.reset)
def c2(texto, color_texto=c.azul):
if globalDict.gd['verbose'][1]:
texto = str(texto)
print(color_texto, texto, c.reset)
def c3(texto, color_texto=c.cian):
if globalDict.gd['verbose'][2]:
texto = str(texto)
print(color_texto, texto, c.reset)
def c4(texto, color_texto=c.rosa):
if globalDict.gd['verbose'][3]:
texto = str(texto)
print(color_texto, texto, c.reset)
def consola(texto, color_texto=c.verde):
texto = str(texto)
print(color_texto, texto, c.reset)
Upvotes: 0
Reputation: 1011
How can I modify the code to use multiple gamepads or to execute other code while waiting for button-inputs?
Check out the documentation for InputDevice.read
read()
Read multiple input events from device. Return a generator object that yieldsInputEvent
instances. RaisesBlockingIOError
if there are no available events at the moment.
Select will block until an input event is available. Instead, we can read events until we get BlockingIOError. And then continue to the next gamepad, or do any other work that needs to be done in the main loop.
You may also consider using InputDevice.read_one
read_one()
Read and return a single input event as an instance ofInputEvent
.Return
None
if there are no pending input events.
Upvotes: 1