我正在使用 Python Telemetrix 库来控制 ESP32。我有一个带有接口的 Tkinter 应用程序,我想在应用程序运行时进行连接和断开连接,以便我可以更改正在寻址的微控制器。
开发环境:
这个闪烁示例对我来说工作正常。连接成功并 LED 闪烁
闪烁示例:https://github.com/MrYsLab/telemetrix-esp32/blob/master/examples/wifi-aio/blink_wifi_aio.py
这是我自己的示例代码,演示我的应用程序如何使用 asyncio 以及我在使用 Telemetrix 模块时遇到的问题。我想使用纯 asyncio,因此不需要使用线程模块。
import asyncio
import tkinter as tk
from telemetrix_aio_esp32 import telemetrix_aio_esp32
DIGITAL_PIN = 2
class App:
def __init__(self):
self.root = tk.Tk()
self.root.title("TelemetrixAioEsp32 Blink Example")
self.board = None
self.ip_address_frame = tk.Frame(self.root)
self.ip_address_frame.pack()
self.ip_address_label = tk.Label(self.ip_address_frame, text="IP Address:")
self.ip_address_label.pack(side="left")
self.ip_address_entry = tk.Entry(self.ip_address_frame)
self.ip_address_entry.pack(side="left", padx=5)
self.ip_address_entry.insert("end",'192.168.1.196')
self.button_frame = tk.Frame(self.root)
self.button_frame.pack()
self.connect_button = tk.Button(self.button_frame, text="Connect", command=self.start_connect)
self.connect_button.pack(side="left", padx=5)
self.start_button = tk.Button(self.button_frame, text="Start Blink", command=self.start_blink)
self.start_button.pack(side="left")
self.start_button['state'] = tk.DISABLED
self.request_blink = False
self.request_connect = False
self.event_loop = asyncio.get_event_loop()
asyncio.set_event_loop(self.event_loop)
self.showing = True
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
async def connect(self):
if self.board is None:
ip_address = self.ip_address_entry.get()
self.board = telemetrix_aio_esp32.TelemetrixAioEsp32(transport_address=ip_address, loop=self.event_loop, autostart=False)
await self.board.start_aio()
self.connect_button["text"] = "Disconnect"
self.start_button['state'] = tk.NORMAL
else:
await self.board.shutdown()
self.board = None
self.connect_button["text"] = "Connect"
self.start_button['state'] = tk.DISABLED
async def show(self):
while self.showing:
self.root.update()
if self.request_blink:
self.request_blink = False
await self.blink(DIGITAL_PIN)
if self.request_connect:
self.request_connect = False
await self.connect()
if self.board is not None:
await self.connect()
async def blink(self, pin):
try:
await self.board.set_pin_mode_digital_output(pin)
for x in range(4):
print('ON')
await self.board.digital_write(pin, 0)
await asyncio.sleep(1)
print('OFF')
await self.board.digital_write(pin, 1)
await asyncio.sleep(1)
except Exception as e:
print(f"Error during blink: {e}")
def start_blink(self):
self.request_blink = True
def start_connect(self):
self.request_connect = True
async def run_tk(self):
try:
await self.show();
finally:
self.event_loop.close()
def run(self):
asyncio.run(self.run_tk())
def on_closing(self):
self.showing = False
self.root.destroy()
if __name__ == "__main__":
app = App()
app.run()
我正在使用 asyncio 运行我自己的事件循环来处理更新 tkwindow,以及我可能需要根据用户选择运行的任何异步方法。这与 websockets 库配合得很好,可以启动和停止 websocket 连接。
但是上面的示例代码在连接到 ESP32 时会产生以下错误
PS C:\Users\lokno\Desktop\Scratch\Code> python .\blink_wifi_app.py
TelemetrixAioEsp32 Version: 1.3
Copyright (c) 2022 Alan Yorinks All rights reserved.
Successfully connected to: 192.168.1.196:31336
Traceback (most recent call last):
File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 110, in <module>
app.run()
File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 102, in run
asyncio.run(self.run_tk())
File "C:\Users\lokno\scoop\apps\python\current\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\lokno\scoop\apps\python\current\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lokno\scoop\apps\python\current\Lib\asyncio\base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 97, in run_tk
await self.show();
^^^^^^^^^^^^^^^^^
File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 69, in show
await self.connect()
File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 48, in connect
await self.board.start_aio()
File "C:\Users\lokno\scoop\apps\python\current\Lib\site-packages\telemetrix_aio_esp32\telemetrix_aio_esp32.py", line 282, in start_aio
raise RuntimeError('Could not retrieve server firmware version')
RuntimeError: Could not retrieve server firmware version
我认为我将 asyncio 与 telemetrix 库一起使用的方式存在错误。我正在发送自己的事件循环并设置 autostart=False ,它只运行以下行:
if autostart:
self.loop.run_until_complete(self.start_aio())
当前的错误没有帮助,我被困住了。它表明我运行 start_aio() 的方式有问题,但我不确定到底是什么。我很感激任何调试建议。
Telemetrix 库没有未解决的问题。
这是我导入的主要模块: https://github.com/MrYsLab/telemetrix-esp32/blob/master/telemetrix_aio_esp32/telemetrix_aio_esp32.py
我正在开发的实际应用程序位于此处: https://github.com/Lokno/ArduinoControlUtilities/blob/main/websocket_pyfirmata.py
谢谢您的帮助。
我已经解决了这个问题。我没有意识到便利方法 asyncio.run() 本身正在创建一个事件循环。我认为它可能使用了之前设置的事件循环,因为我之前调用了 asyncio.set_event_loop() 。因此,我发送到 TelemetrixAioEsp32 构造函数的循环与运行 connect 方法的循环不同。
更正了代码,现在有一个按钮可以让 LED 闪烁。
import asyncio
import tkinter as tk
from telemetrix_aio_esp32 import telemetrix_aio_esp32
DIGITAL_PIN = 2
class App:
def __init__(self):
self.root = tk.Tk()
self.root.title("TelemetrixAioEsp32 Blink Example")
self.board = None
self.ip_address_frame = tk.Frame(self.root)
self.ip_address_frame.pack()
self.ip_address_label = tk.Label(self.ip_address_frame, text="IP Address:")
self.ip_address_label.pack(side="left")
self.ip_address_entry = tk.Entry(self.ip_address_frame)
self.ip_address_entry.pack(side="left", padx=5)
self.ip_address_entry.insert("end",'192.168.1.196')
self.button_frame = tk.Frame(self.root)
self.button_frame.pack()
self.connect_button = tk.Button(self.button_frame, text="Connect", command=self.start_connect)
self.connect_button.pack(side="left", padx=5)
self.start_button = tk.Button(self.button_frame, text="On", command=self.start_blink)
self.start_button.pack(side="left")
self.start_button['state'] = tk.DISABLED
self.request_blink = False
self.request_connect = False
self.event_loop = asyncio.get_event_loop()
asyncio.set_event_loop(self.event_loop)
self.showing = True
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.led_on = False
async def connect(self):
if self.board is None:
ip_address = self.ip_address_entry.get()
self.board = telemetrix_aio_esp32.TelemetrixAioEsp32(transport_address=ip_address, loop=self.event_loop, autostart=False)
await self.board.start_aio()
await self.board.set_pin_mode_digital_output(DIGITAL_PIN)
await self.board.digital_write(DIGITAL_PIN, 0)
self.led_on = False
self.start_button["text"] = "On"
self.connect_button["text"] = "Disconnect"
self.start_button['state'] = tk.NORMAL
else:
await self.board.shutdown()
self.board = None
if self.showing:
self.connect_button["text"] = "Connect"
self.start_button['state'] = tk.DISABLED
if self.showing:
self.connect_button['state'] = tk.NORMAL
async def show(self):
while self.showing:
self.root.update()
if self.request_blink:
self.request_blink = False
if self.led_on:
await self.board.digital_write(DIGITAL_PIN, 0)
await asyncio.sleep(0.001)
self.start_button["text"] = "On"
self.led_on = False
else:
await self.board.digital_write(DIGITAL_PIN, 1)
await asyncio.sleep(0.001)
self.start_button["text"] = "Off"
self.led_on = True
self.start_button['state'] = tk.NORMAL
if self.request_connect:
self.request_connect = False
await self.connect()
if self.board is not None:
await self.connect()
def start_blink(self):
self.request_blink = True
self.start_button['state'] = tk.DISABLED
def start_connect(self):
self.request_connect = True
self.connect_button['state'] = tk.DISABLED
def run(self):
self.event_loop.run_until_complete(self.show())
def on_closing(self):
self.showing = False
self.root.destroy()
if __name__ == "__main__":
app = App()
app.run()