我编写了一个可通过命令行运行的 Python 脚本。我想将脚本拆分为 REST 端点。
为此,我想将
async with BleakClient(d, timeout=10) as client:
替换为允许我将 client
设为全局的东西,以便我可以在另一个 REST 端点中使用它。换句话说,我希望 client
在连接到设备的呼叫之外继续存在。有什么想法吗?
我尝试了
client = await BleakClient(d, timeout=10)
,但是这一行抛出了异常:object BleakClient can't be used in 'await' expression
。
编辑
我包含整个(截断的)Python 脚本。调用 ConnectAndReadChar() 有效。调用 ConnectUUT() 不起作用。 'client = wait BleakClient(d, timeout=10)' 行出现异常,
import asyncio
import traceback
from bleak import BleakScanner, BleakClient
import sys
import time
global client
async def ConnectAndReadChar(MAC_ID):
d = await BleakScanner.find_device_by_address(MAC_ID, timeout=20)
time.sleep(0.5)
while True:
try:
if d.address == MAC_ID:
async with BleakClient(d, timeout=10) as client:
time.sleep(0.5)
res = await client.pair(protection_level=None)
print(res)
x = client.is_connected
print("Connected: {0}".format(x))
svc = client.services.get_service("Service ID")
thisCharacteristic = svc.get_characteristic("Char ID")
value = bytes(await client.read_gatt_char(thisCharacteristic))
print("\tValue: {0} ".format(value))
await client.disconnect()
await client.unpair()
return
except:
e = sys.exc_info()[0]
print(e)
async def ConnectUUT(MAC_ID):
global client
d = await BleakScanner.find_device_by_address(MAC_ID, timeout=20)
while True:
try:
if d.address == MAC_ID:
# async with BleakClient(d, timeout=10) as client:
client = await BleakClient(d, timeout=10)
if client is not None:
time.sleep(0.5)
res = await client.pair(protection_level=None)
print(res)
x = client.is_connected
print("Connected: {0}".format(x))
return
except:
e = sys.exc_info()[0]
print(e)
async def ReadFrames():
global client
try:
svc = client.services.get_service("Service ID")
thisCharacteristic = svc.get_characteristic("Char ID")
return
except:
e = sys.exc_info()[0]
print(e)
async def DisconnectUUT():
global client
try:
await client.disconnect()
await client.unpair()
except:
e = sys.exc_info()[0]
print(e)
return
async def main():
await ConnectAndReadChar("MAC ID goes here")
await ConnectUUT("MAC ID goes here")
await ReadFrames()
await DisconnectUUT()
if __name__ == "__main__":
asyncio.run(main())
如果您使用 BleakClient 作为异步上下文管理器,它会隐式执行
connect
。您也可以明确地执行此操作:
client = BleakClient(d, timeout=10)
await client.connect()