使用Python和bleak库通知蓝牙GATT设备,但结果不稳定

问题描述 投票:0回答:2

我尝试使用Python来控制一些BLE GATT设备。 我找到 Bleak (https://bleak.readthedocs.io/en/latest/) 这个库来与 GATT 设备通信。

当我使用它时,大多数时候效果都很好。

但是当我尝试与 BLE GATT 医疗设备通信时,我发现我不能总是用该设备进行通知。

这是我的代码,我只是对 Bleak 的 github 示例做了一些修改:(https://github.com/hbldh/bleak/blob/develop/examples/enable_notifications.py)

import asyncio
import logging

from bleak import discover
from bleak import BleakClient

devices_dict = {}
devices_list = []
receive_data = []

#To discover BLE devices nearby 
async def scan():
    dev = await discover()
    for i in range(0,len(dev)):
        #Print the devices discovered
        print("[" + str(i) + "]" + dev[i].address,dev[i].name,dev[i].metadata["uuids"])
        #Put devices information into list
        devices_dict[dev[i].address] = []
        devices_dict[dev[i].address].append(dev[i].name)
        devices_dict[dev[i].address].append(dev[i].metadata["uuids"])
        devices_list.append(dev[i].address)

#An easy notify function, just print the recieve data
def notification_handler(sender, data):
    print(', '.join('{:02x}'.format(x) for x in data))

async def run(address, debug=False):
    log = logging.getLogger(__name__)
    if debug:
        import sys

        log.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        log.addHandler(h)

    async with BleakClient(address) as client:
        x = await client.is_connected()
        log.info("Connected: {0}".format(x))

        for service in client.services:
            log.info("[Service] {0}: {1}".format(service.uuid, service.description))
            for char in service.characteristics:
                if "read" in char.properties:
                    try:
                        value = bytes(await client.read_gatt_char(char.uuid))
                    except Exception as e:
                        value = str(e).encode()
                else:
                    value = None
                log.info(
                    "\t[Characteristic] {0}: (Handle: {1}) ({2}) | Name: {3}, Value: {4} ".format(
                        char.uuid,
                        char.handle,
                        ",".join(char.properties),
                        char.description,
                        value,
                    )
                )
                for descriptor in char.descriptors:
                    value = await client.read_gatt_descriptor(descriptor.handle)
                    log.info(
                        "\t\t[Descriptor] {0}: (Handle: {1}) | Value: {2} ".format(
                            descriptor.uuid, descriptor.handle, bytes(value)
                        )
                    )

                #Characteristic uuid
                CHARACTERISTIC_UUID = "put your characteristic uuid"

                await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
                await asyncio.sleep(5.0)
                await client.stop_notify(CHARACTERISTIC_UUID)

if __name__ == "__main__":
    print("Scanning for peripherals...")

    #Build an event loop
    loop = asyncio.get_event_loop()
    #Run the discover event
    loop.run_until_complete(scan())

    #let user chose the device
    index = input('please select device from 0 to ' + str(len(devices_list)) + ":")
    index = int(index)
    address = devices_list[index]
    print("Address is " + address)

    #Run notify event
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(run(address, True))

在大多数情况下,此代码可以很好地工作this image

但有时候(大约25%的比例),这个问题就发生了:

[0]08:6B:D7:12:F1:33 Nonin3150_502892837['uuid']
[1]1D:BD:4A:69:8B:AB Unknown []
[2]73:15:CD:47:AF:08 Unknown []
[3]40:4E:36:5B:8D:1B HTC BS 1BBDB9 ['uuid']
[4]6B:FB:E5:DD:7F:4E Unknown []
[5]69:A7:87:23:5C:7C Unknown []
please select device from 0 to 6:0
Address is 08:6B:D7:12:F1:33
Traceback (most recent call last):
  File "D:/Bletest/nonin_test.py", line 91, in <module>
    loop.run_until_complete(run(address, True))
  File "C:\Users\rizal\AppData\Local\Programs\Python\Python37\lib\asyncio\base_events.py", line 579, in run_until_complete
    return future.result()
  File "D:/Bletest/nonin_test.py", line 36, in run
    async with BleakClient(address) as client:
  File "C:\Users\rizal\AppData\Local\Programs\Python\Python37\lib\site-packages\bleak\backends\client.py", line 60, in __aenter__
    await self.connect()
  File "C:\Users\rizal\AppData\Local\Programs\Python\Python37\lib\site-packages\bleak\backends\dotnet\client.py", line 154, in connect
    "Device with address {0} was not found.".format(self.address)
bleak.exc.BleakError: Device with address 08:6B:D7:12:F1:33 was not found.

Process finished with exit code 1

我不知道为什么程序可以发现这个设备,但无法通知它。

这是我代码的问题,还是这个设备的程序流程造成的?

python bluetooth-lowenergy gatt
2个回答
1
投票

我也有同样的问题。不过,我认为这与BLE在Windows中的实现有关。当您在 Windows 界面中扫描设备时,有时可以看到设备如何出现和消失。设备广告间隔可能太长。

但是,可以通过将其包装在 try catch 部分中来修复。

这样的东西可能对你有用。

    async def connect_to_device(self):
        while True:
            if self.connection_enabled:
                try:
                    await self.client.connect()
                    self.connected = await self.client.is_connected()
                    if self.connected:
                        print("Connected to Device")
                        self.client.set_disconnected_callback(self.on_disconnect)
                        await self.client.start_notify(
                            self.notify_characteristic, self.notify_callback,
                        )
                        while True:
                            if not self.connected:
                                break
                            await asyncio.sleep(1.0)
                    else:
                        print(f"Failed to connect to Device")
                except Exception as e:
                    print(e)
            else:
                await asyncio.sleep(1.0)

您只需将此任务添加到循环中

asyncio.ensure_future(self.connect_to_device(), loop)

定义客户

self.client = BleakClient(self.connected_device.address, loop=loop)

并启用连接

self.connection_enabled = true

0
投票

我尝试通过蓝牙将我的 iPhone 与树莓派连接,以便在有人呼叫 iPhone 时编写接口。我总是偶然发现错误“org.bluez.Erreor:未连接并找到您的解决方案。请您给我完整的建议,在哪里实现 asyncio.ensure_future(self.connect_to_device(),loop) self.client = BleakClient(self.connected_device.address, 循环=循环) 和 self.connection_enabled = true 谢谢!!!!°

© www.soinside.com 2019 - 2024. All rights reserved.