hobbit
hobbit

Reputation: 63

MicroPython bluetooth connection to an OBD BLE

I am trying to connect Raspberry Pico W to and obd2 adapter using BLE. As for the detection and connection to the obd I should have succeeded, but now I'm stuck on how to send commands to the OBD such as ATZ, ATD etc.

For the project i am using micropython and aioble library.

If I understand how it works correctly, I am acting as a central and trying to connect to a peripheral device.

For sending the AT command I am using the write function, in the code below is more clear what I am doing, but how can I check the response received from the obd?

Since the service use two different UUID for writing and reading, should I create two different characteristic one for reading and one for writing?

import aioble
import bluetooth
import machine
import uasyncio as asyncio


_OBD_UUID_SERVICE = bluetooth.UUID(0xfff0)
_OBD_UUID_CHARACTERISTIC = bluetooth.UUID(0xfff1)

led = machine.Pin("LED", machine.Pin.OUT)
connected = False
alive = False


async def find_remote():
    async with aioble.scan(duration_ms=5000, interval_us=30000, window_us=30000, active=True) as scanner:
        async for result in scanner:
            if (result.name() == "OBDBLE"):
                print("Found OBDBLE")
                for item in result.services():
                    print(item)
                if _OBD_UUID_SERVICE in result.services():
                    print("Found OBDBLE service")
                    return result.device
    return None


async def blink_task():
    print("Blink task started")
    toggle = True

    while True and alive:
        blink = 250
        led.value(toggle)
        toggle = not toggle

        if connected:
            blink = 1000
        else:
            blink = 250
        await asyncio.sleep_ms(blink)

    print("Blink task stopped")


async def peripheral_task():

    global connected, alive
    connected = False
    device = await find_remote()

    if not device:
        print("OBD device not found")
        return

    try:
        print("Connecting to", device)
        connection = await device.connect()

    except asyncio.TimeoutError:
        print("Timeout during connection")
        return

    async with connection:
        print("Connected")
        alive = True
        connected = True

   
        try:

            obd_service = await connection.service(_OBD_UUID_SERVICE)
            print(obd_service)
            obd_characteristic = await obd_service.characteristic(_OBD_UUID_CHARACTERISTIC)
            print(obd_characteristic)

        except asyncio.TimeoutError:
            print("Timeout discovering services/characteristics")
            return

    
        try:
            if obd_service == None:
                print("Obd Disconnected")
                alive = False
                connected = False

            if obd_characteristic == None:
                print("No Control")
                alive = False
                connected = False

        except asyncio.TimeoutError:
            print("Timeout during discovery/service/characteristic")
            alive = False
        
        print("Sending ATZ")
        obd_characteristic.write(b"ATZ",response=True)
        obd_characteristic.notified()
        data = await obd_characteristic.read()
        print(data)


    await connection.disconnected()
    print("Disconnected")


async def main():
    tasks = []
    tasks = [
        asyncio.create_task(blink_task()),
        asyncio.create_task(peripheral_task())
    ]

    await asyncio.gather(*tasks)

while True:
    asyncio.run(main())

This is the output:

Blink task started
Found OBDBLE
Found OBDBLE
UUID(0xfff0)
Found OBDBLE service
Connecting to Device(ADDR_PUBLIC, aa:bb:cc:12:22:33)
Connected
Service: 19 24 UUID(0xfff0)
Characteristic: 22 21 18 UUID(0xfff1)
Sending ATZ
b''
Disconnected

I found the UUID using an app, i will attach the image so you can check

odb_details

Upvotes: 1

Views: 557

Answers (1)

DrMickeyLauer
DrMickeyLauer

Reputation: 4684

This is a very broad question. Some aspects are already answered in https://stackoverflow.com/a/52139765/415982, in particular that you need to probe for the amount of characteristics to use. This is vendor-specific.

As for the actual communication, you need to write a queue abstraction with an API that allows you to send a command, waits for the command to be assembled (you might as well receive only partial answers), and then delivers the whole answer to the caller.

Don't forget that ELM327-derived protocols require commands to be terminated with \r.

Upvotes: 0

Related Questions