vkatsou
vkatsou

Reputation: 83

pyusb USB disconnecting on request

I am facing a problem using the pyusb library in python. I have configured the filter on libusb. I am trying to replicate a qr code scan, and whenever I manage to collect the data from the QR code I want to trigger a post request. My code runs correctly, but runs 1 time and after the post request I am hearing from my computer libusb the sound like disconnecting and reconnecting the qr code device. If I don't make the post request, the code runs endlessly which is the correct behavior.

Am I missing something here?

import usb.core
import usb.util
import requests

# USB QR code reader vendor and product IDs
VENDOR_ID = XXXX  # Vendor ID in hex
PRODUCT_ID = XXXX  # Product ID in hex

# Create a mapping dictionary
mapping = {
    4: 'a', 5: 'b', 6: 'c', 7: 'd', 8: 'e', 9: 'f', 10: 'g', 11: 'h', 12: 'i', 13: 'j',
    14: 'k', 15: 'l', 16: 'm', 17: 'n', 18: 'o', 19: 'p', 20: 'q', 21: 'r', 22: 's', 23: 't',
    24: 'u', 25: 'v', 26: 'w', 27: 'x', 28: 'y', 29: 'z', 30: '1', 31: '2', 32: '3', 33: '4',
    34: '5', 35: '6', 36: '7', 37: '8', 38: '9', 39: '0', 45: '-'
}

# Find the USB device
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)

if dev is None:
    raise ValueError("QR Code Scanner not found")

# Set the active configuration
dev.set_configuration()

# Claim the interface
cfg = dev.get_active_configuration()
intf = cfg[(0, 0)]
usb.util.claim_interface(dev, intf)

# Endpoint to read from
endpoint = intf[0]

# API URL
api_url = "https://my-website/api/membership-manager/checkins"

# To store the concatenated code values
code_values = []


def post_qr_code(qr_code):
    headers = {
        'Provider': 'UUID',
        'Content-Type': 'application/json'
    }

    payload = {
        'userId': qr_code
    }

    try:
        response = requests.post(api_url, json=payload, headers=headers)
        response.raise_for_status()  # Raises HTTPError for bad responses (4xx and 5xx)

        data = response.json()
        print('RESULT', data)

    except requests.exceptions.RequestException as error:
        print('There was an error!', error)


def read_qr_code():
    global code_values

    try:
        # Read data from the USB endpoint
        data = dev.read(endpoint.bEndpointAddress, endpoint.wMaxPacketSize, timeout=5000)

        if len(data) > 2:
            code_value = data[2]

            # Skip processing if data[2] is 0
            if code_value == 0:
                return

            # Convert code_value to character using the dictionary
            qr_code_char = mapping.get(code_value, '?')

            if qr_code_char != '?':
                code_values.append(qr_code_char)

            # Check if the concatenated string length is 36
            if len(code_values) == 36:
                qr_code = ''.join(code_values)
                print(f"QR Code Scanned: {qr_code}")
                post_qr_code(qr_code)
                code_values = []  # Clear the list after processing

        else:
            print("Data length is less than expected.")

    except usb.core.USBError as e:
        # Handle USB errors and avoid re-attempting in a loop
        if e.errno != None:
            print(f"USB Error: {str(e)}")


if __name__ == "__main__":
    try:
        print("Waiting for QR code scan...")
        while True:
            read_qr_code()
    except KeyboardInterrupt:
        # Release the interface when the program ends
        usb.util.release_interface(dev, intf)
        usb.util.dispose_resources(dev)
        print("Program terminated and USB interface released.")
    except Exception as e:
        # Handle any other exceptions that occur
        print(f"An unexpected error occurred: {e}")
        usb.util.release_interface(dev, intf)
        usb.util.dispose_resources(dev)

Upvotes: 0

Views: 37

Answers (1)

vkatsou
vkatsou

Reputation: 83

It turns out that I needed to use threading.

Adding the following solved my problem:

import threading
def post_qr_code(qr_code):
headers = {
    'Provider': 'UUID',
    'Content-Type': 'application/json'
}

payload = {
    'userId': qr_code
}

try:
    response = requests.post(api_url, json=payload, headers=headers)
    response.raise_for_status()  # Raises HTTPError for bad responses (4xx and 5xx)

    data = response.json()
    print('RESULT', data)

except requests.exceptions.RequestException as error:
    print('There was an error!', error)


def post_in_thread(qr_code):
    threading.Thread(target=post_qr_code, args=(qr_code,)).start()

Upvotes: 0

Related Questions