Python Tkinter:与 Telemetrix-AIO 共享异步事件循环

问题描述 投票:0回答:1

我正在使用 Python Telemetrix 库来控制 ESP32。我有一个带有接口的 Tkinter 应用程序,我想在应用程序运行时进行连接和断开连接,以便我可以更改正在寻址的微控制器。

开发环境:

  • Windows 11
  • Python 3.11.5

这个闪烁示例对我来说工作正常。连接成功并 LED 闪烁

闪烁示例:https://github.com/MrYsLab/telemetrix-esp32/blob/master/examples/wifi-aio/blink_wifi_aio.py

这是我自己的示例代码,演示我的应用程序如何使用 asyncio 以及我在使用 Telemetrix 模块时遇到的问题。我想使用纯 asyncio,因此不需要使用线程模块。

import asyncio
import tkinter as tk
from telemetrix_aio_esp32 import telemetrix_aio_esp32

DIGITAL_PIN = 2

class App:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("TelemetrixAioEsp32 Blink Example")

        self.board = None

        self.ip_address_frame = tk.Frame(self.root)
        self.ip_address_frame.pack()

        self.ip_address_label = tk.Label(self.ip_address_frame, text="IP Address:")
        self.ip_address_label.pack(side="left")

        self.ip_address_entry = tk.Entry(self.ip_address_frame)
        self.ip_address_entry.pack(side="left", padx=5)
        self.ip_address_entry.insert("end",'192.168.1.196')  

        self.button_frame = tk.Frame(self.root)
        self.button_frame.pack()

        self.connect_button = tk.Button(self.button_frame, text="Connect", command=self.start_connect)
        self.connect_button.pack(side="left", padx=5)

        self.start_button = tk.Button(self.button_frame, text="Start Blink", command=self.start_blink)
        self.start_button.pack(side="left")
        self.start_button['state'] = tk.DISABLED

        self.request_blink = False
        self.request_connect = False

        self.event_loop = asyncio.get_event_loop()
        asyncio.set_event_loop(self.event_loop)

        self.showing = True

        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)

    async def connect(self):
        if self.board is None:
            ip_address = self.ip_address_entry.get()
            self.board = telemetrix_aio_esp32.TelemetrixAioEsp32(transport_address=ip_address, loop=self.event_loop, autostart=False)
            await self.board.start_aio()

            self.connect_button["text"] = "Disconnect"
            self.start_button['state'] = tk.NORMAL
        else:

            await self.board.shutdown()

            self.board = None
            self.connect_button["text"] = "Connect"
            self.start_button['state'] = tk.DISABLED

    async def show(self):
        while self.showing:
            self.root.update()

            if self.request_blink:
                self.request_blink = False
                await self.blink(DIGITAL_PIN)
            if self.request_connect:
                self.request_connect = False
                await self.connect()

        if self.board is not None:
            await self.connect() 

    async def blink(self, pin):
        try:
            await self.board.set_pin_mode_digital_output(pin)

            for x in range(4):
                print('ON')
                await self.board.digital_write(pin, 0)
                await asyncio.sleep(1)
                print('OFF')
                await self.board.digital_write(pin, 1)
                await asyncio.sleep(1)

        except Exception as e:
            print(f"Error during blink: {e}")

    def start_blink(self):
        self.request_blink = True

    def start_connect(self):
        self.request_connect = True

    async def run_tk(self):
        try:
            await self.show();
        finally:
            self.event_loop.close()

    def run(self):
        asyncio.run(self.run_tk())

    def on_closing(self):
        self.showing = False
        self.root.destroy()

if __name__ == "__main__":
    app = App()
    app.run()

我正在使用 asyncio 运行我自己的事件循环来处理更新 tkwindow,以及我可能需要根据用户选择运行的任何异步方法。这与 websockets 库配合得很好,可以启动和停止 websocket 连接。

但是上面的示例代码在连接到 ESP32 时会产生以下错误

PS C:\Users\lokno\Desktop\Scratch\Code> python .\blink_wifi_app.py
TelemetrixAioEsp32 Version: 1.3
Copyright (c) 2022 Alan Yorinks All rights reserved.

Successfully connected to: 192.168.1.196:31336
Traceback (most recent call last):
  File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 110, in <module>
    app.run()
  File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 102, in run
    asyncio.run(self.run_tk())
  File "C:\Users\lokno\scoop\apps\python\current\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\lokno\scoop\apps\python\current\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lokno\scoop\apps\python\current\Lib\asyncio\base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 97, in run_tk
    await self.show();
    ^^^^^^^^^^^^^^^^^
  File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 69, in show
    await self.connect()
  File "C:\Users\lokno\Desktop\Scratch\Code\blink_wifi_app.py", line 48, in connect
    await self.board.start_aio()
  File "C:\Users\lokno\scoop\apps\python\current\Lib\site-packages\telemetrix_aio_esp32\telemetrix_aio_esp32.py", line 282, in start_aio
    raise RuntimeError('Could not retrieve server firmware version')
RuntimeError: Could not retrieve server firmware version

我认为我将 asyncio 与 telemetrix 库一起使用的方式存在错误。我正在发送自己的事件循环并设置 autostart=False ,它只运行以下行:

if autostart:
    self.loop.run_until_complete(self.start_aio())

当前的错误没有帮助,我被困住了。它表明我运行 start_aio() 的方式有问题,但我不确定到底是什么。我很感激任何调试建议。

Telemetrix 库没有未解决的问题。

这是我导入的主要模块: https://github.com/MrYsLab/telemetrix-esp32/blob/master/telemetrix_aio_esp32/telemetrix_aio_esp32.py

我正在开发的实际应用程序位于此处: https://github.com/Lokno/ArduinoControlUtilities/blob/main/websocket_pyfirmata.py

谢谢您的帮助。

python-3.x tkinter python-asyncio esp32
1个回答
0
投票

我已经解决了这个问题。我没有意识到便利方法 asyncio.run() 本身正在创建一个事件循环。我认为它可能使用了之前设置的事件循环,因为我之前调用了 asyncio.set_event_loop() 。因此,我发送到 TelemetrixAioEsp32 构造函数的循环与运行 connect 方法的循环不同。

更正了代码,现在有一个按钮可以让 LED 闪烁。

import asyncio
import tkinter as tk
from telemetrix_aio_esp32 import telemetrix_aio_esp32

DIGITAL_PIN = 2

class App:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("TelemetrixAioEsp32 Blink Example")

        self.board = None

        self.ip_address_frame = tk.Frame(self.root)
        self.ip_address_frame.pack()

        self.ip_address_label = tk.Label(self.ip_address_frame, text="IP Address:")
        self.ip_address_label.pack(side="left")

        self.ip_address_entry = tk.Entry(self.ip_address_frame)
        self.ip_address_entry.pack(side="left", padx=5)
        self.ip_address_entry.insert("end",'192.168.1.196')  

        self.button_frame = tk.Frame(self.root)
        self.button_frame.pack()

        self.connect_button = tk.Button(self.button_frame, text="Connect", command=self.start_connect)
        self.connect_button.pack(side="left", padx=5)

        self.start_button = tk.Button(self.button_frame, text="On", command=self.start_blink)
        self.start_button.pack(side="left")
        self.start_button['state'] = tk.DISABLED

        self.request_blink = False
        self.request_connect = False

        self.event_loop = asyncio.get_event_loop()
        asyncio.set_event_loop(self.event_loop)

        self.showing = True

        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)

        self.led_on = False

    async def connect(self):
        if self.board is None:
            ip_address = self.ip_address_entry.get()
            self.board = telemetrix_aio_esp32.TelemetrixAioEsp32(transport_address=ip_address, loop=self.event_loop, autostart=False)
            await self.board.start_aio()

            await self.board.set_pin_mode_digital_output(DIGITAL_PIN)
            await self.board.digital_write(DIGITAL_PIN, 0)

            self.led_on = False
            self.start_button["text"] = "On"
            self.connect_button["text"] = "Disconnect"
            self.start_button['state'] = tk.NORMAL
        else:
            await self.board.shutdown()
            self.board = None

            if self.showing:
                self.connect_button["text"] = "Connect"
                self.start_button['state'] = tk.DISABLED

        if self.showing:
            self.connect_button['state'] = tk.NORMAL

    async def show(self):
        while self.showing:
            self.root.update()

            if self.request_blink:
                self.request_blink = False
                if self.led_on:
                    await self.board.digital_write(DIGITAL_PIN, 0)
                    await asyncio.sleep(0.001)
                    self.start_button["text"] = "On"
                    self.led_on = False
                else:
                    await self.board.digital_write(DIGITAL_PIN, 1)
                    await asyncio.sleep(0.001)
                    self.start_button["text"] = "Off"
                    self.led_on = True  
                self.start_button['state'] = tk.NORMAL          
            if self.request_connect:
                self.request_connect = False
                await self.connect()

        if self.board is not None:
            await self.connect() 

    def start_blink(self):
        self.request_blink = True
        self.start_button['state'] = tk.DISABLED

    def start_connect(self):
        self.request_connect = True
        self.connect_button['state'] = tk.DISABLED

    def run(self):
        self.event_loop.run_until_complete(self.show())

    def on_closing(self):
        self.showing = False
        self.root.destroy()

if __name__ == "__main__":
    app = App()
    app.run()
© www.soinside.com 2019 - 2024. All rights reserved.