Reputation: 83
I am facing a problem using the pyusb library in python. I have configured the filter on libusb. I am trying to replicate a qr code scan, and whenever I manage to collect the data from the QR code I want to trigger a post request. My code runs correctly, but runs 1 time and after the post request I am hearing from my computer libusb the sound like disconnecting and reconnecting the qr code device. If I don't make the post request, the code runs endlessly which is the correct behavior.
Am I missing something here?
import usb.core
import usb.util
import requests
# USB QR code reader vendor and product IDs
VENDOR_ID = XXXX # Vendor ID in hex
PRODUCT_ID = XXXX # Product ID in hex
# Create a mapping dictionary
mapping = {
4: 'a', 5: 'b', 6: 'c', 7: 'd', 8: 'e', 9: 'f', 10: 'g', 11: 'h', 12: 'i', 13: 'j',
14: 'k', 15: 'l', 16: 'm', 17: 'n', 18: 'o', 19: 'p', 20: 'q', 21: 'r', 22: 's', 23: 't',
24: 'u', 25: 'v', 26: 'w', 27: 'x', 28: 'y', 29: 'z', 30: '1', 31: '2', 32: '3', 33: '4',
34: '5', 35: '6', 36: '7', 37: '8', 38: '9', 39: '0', 45: '-'
}
# Find the USB device
dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
if dev is None:
raise ValueError("QR Code Scanner not found")
# Set the active configuration
dev.set_configuration()
# Claim the interface
cfg = dev.get_active_configuration()
intf = cfg[(0, 0)]
usb.util.claim_interface(dev, intf)
# Endpoint to read from
endpoint = intf[0]
# API URL
api_url = "https://my-website/api/membership-manager/checkins"
# To store the concatenated code values
code_values = []
def post_qr_code(qr_code):
headers = {
'Provider': 'UUID',
'Content-Type': 'application/json'
}
payload = {
'userId': qr_code
}
try:
response = requests.post(api_url, json=payload, headers=headers)
response.raise_for_status() # Raises HTTPError for bad responses (4xx and 5xx)
data = response.json()
print('RESULT', data)
except requests.exceptions.RequestException as error:
print('There was an error!', error)
def read_qr_code():
global code_values
try:
# Read data from the USB endpoint
data = dev.read(endpoint.bEndpointAddress, endpoint.wMaxPacketSize, timeout=5000)
if len(data) > 2:
code_value = data[2]
# Skip processing if data[2] is 0
if code_value == 0:
return
# Convert code_value to character using the dictionary
qr_code_char = mapping.get(code_value, '?')
if qr_code_char != '?':
code_values.append(qr_code_char)
# Check if the concatenated string length is 36
if len(code_values) == 36:
qr_code = ''.join(code_values)
print(f"QR Code Scanned: {qr_code}")
post_qr_code(qr_code)
code_values = [] # Clear the list after processing
else:
print("Data length is less than expected.")
except usb.core.USBError as e:
# Handle USB errors and avoid re-attempting in a loop
if e.errno != None:
print(f"USB Error: {str(e)}")
if __name__ == "__main__":
try:
print("Waiting for QR code scan...")
while True:
read_qr_code()
except KeyboardInterrupt:
# Release the interface when the program ends
usb.util.release_interface(dev, intf)
usb.util.dispose_resources(dev)
print("Program terminated and USB interface released.")
except Exception as e:
# Handle any other exceptions that occur
print(f"An unexpected error occurred: {e}")
usb.util.release_interface(dev, intf)
usb.util.dispose_resources(dev)
Upvotes: 0
Views: 37
Reputation: 83
It turns out that I needed to use threading.
Adding the following solved my problem:
import threading
def post_qr_code(qr_code):
headers = {
'Provider': 'UUID',
'Content-Type': 'application/json'
}
payload = {
'userId': qr_code
}
try:
response = requests.post(api_url, json=payload, headers=headers)
response.raise_for_status() # Raises HTTPError for bad responses (4xx and 5xx)
data = response.json()
print('RESULT', data)
except requests.exceptions.RequestException as error:
print('There was an error!', error)
def post_in_thread(qr_code):
threading.Thread(target=post_qr_code, args=(qr_code,)).start()
Upvotes: 0