Reputation: 63
I am trying to connect Raspberry Pico W to and obd2
adapter using BLE
.
As for the detection and connection to the obd
I should have succeeded, but now I'm stuck on how to send commands to the OBD
such as ATZ
, ATD
etc.
For the project i am using micropython
and aioble
library.
If I understand how it works correctly, I am acting as a central and trying to connect to a peripheral device.
For sending the AT command I am using the write function, in the code below is more clear what I am doing, but how can I check the response received from the obd
?
Since the service use two different UUID
for writing and reading, should I create two different characteristic one for reading and one for writing?
import aioble
import bluetooth
import machine
import uasyncio as asyncio
_OBD_UUID_SERVICE = bluetooth.UUID(0xfff0)
_OBD_UUID_CHARACTERISTIC = bluetooth.UUID(0xfff1)
led = machine.Pin("LED", machine.Pin.OUT)
connected = False
alive = False
async def find_remote():
async with aioble.scan(duration_ms=5000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner:
if (result.name() == "OBDBLE"):
print("Found OBDBLE")
for item in result.services():
print(item)
if _OBD_UUID_SERVICE in result.services():
print("Found OBDBLE service")
return result.device
return None
async def blink_task():
print("Blink task started")
toggle = True
while True and alive:
blink = 250
led.value(toggle)
toggle = not toggle
if connected:
blink = 1000
else:
blink = 250
await asyncio.sleep_ms(blink)
print("Blink task stopped")
async def peripheral_task():
global connected, alive
connected = False
device = await find_remote()
if not device:
print("OBD device not found")
return
try:
print("Connecting to", device)
connection = await device.connect()
except asyncio.TimeoutError:
print("Timeout during connection")
return
async with connection:
print("Connected")
alive = True
connected = True
try:
obd_service = await connection.service(_OBD_UUID_SERVICE)
print(obd_service)
obd_characteristic = await obd_service.characteristic(_OBD_UUID_CHARACTERISTIC)
print(obd_characteristic)
except asyncio.TimeoutError:
print("Timeout discovering services/characteristics")
return
try:
if obd_service == None:
print("Obd Disconnected")
alive = False
connected = False
if obd_characteristic == None:
print("No Control")
alive = False
connected = False
except asyncio.TimeoutError:
print("Timeout during discovery/service/characteristic")
alive = False
print("Sending ATZ")
obd_characteristic.write(b"ATZ",response=True)
obd_characteristic.notified()
data = await obd_characteristic.read()
print(data)
await connection.disconnected()
print("Disconnected")
async def main():
tasks = []
tasks = [
asyncio.create_task(blink_task()),
asyncio.create_task(peripheral_task())
]
await asyncio.gather(*tasks)
while True:
asyncio.run(main())
This is the output:
Blink task started
Found OBDBLE
Found OBDBLE
UUID(0xfff0)
Found OBDBLE service
Connecting to Device(ADDR_PUBLIC, aa:bb:cc:12:22:33)
Connected
Service: 19 24 UUID(0xfff0)
Characteristic: 22 21 18 UUID(0xfff1)
Sending ATZ
b''
Disconnected
I found the UUID
using an app, i will attach the image so you can check
Upvotes: 1
Views: 557
Reputation: 4684
This is a very broad question. Some aspects are already answered in https://stackoverflow.com/a/52139765/415982, in particular that you need to probe for the amount of characteristics to use. This is vendor-specific.
As for the actual communication, you need to write a queue abstraction with an API that allows you to send a command, waits for the command to be assembled (you might as well receive only partial answers), and then delivers the whole answer to the caller.
Don't forget that ELM327-derived protocols require commands to be terminated with \r
.
Upvotes: 0