Taipy,threading.Thread 和 app.app_context()

问题描述 投票:0回答:1

我有一个带有子页面的主文件 GUI Taipy'。在其中一个子页面中,我使用了一个名为“Jiga”的类,在其中,我使用一个线程来扫描硬件接口上的四个物理按钮。

当我使用以下代码时:

thread1 = threading.Thread(target=scan_teclas, args=(state.Jiga,))

代码正常执行。但是,在线程函数中,我只能访问“state.Jiga”类中的变量。

如果我使用以下代码:

thread1 = threading.Thread(target=scan_teclas, args=(state,))

代码无法执行,遇到以下错误:

"RuntimeError: Working outside of the application context. This typically means that you attempted to use functionality that requires the current application context. To solve this, set up an application context with app.app_context(). See the documentation for more information."

我怀疑线程在 Flask 或 Taipy 上下文之外运行,这导致了问题。但是,当使用“state.Jiga”时,我没有遇到这个问题。这是正确的吗?”如何制作app_context?

python-multithreading taipy
1个回答
0
投票

Taipy 的多线程是一个复杂的问题;我们正在努力更多地记录它。我可能需要更多的代码来准确解决您的问题;也许弗洛里安·贾克塔有一个想法。同时,我有一个简单的示例,您可以将其用作使用 Taipy 执行多线程代码的基础:

第1步: 安装 Taipy 的最新开发版本

pip install taipy==3.0.0.dev3

第2步: 运行此脚本,该脚本将运行 Taipy 应用程序,并使用套接字等待接收信息:

import socket
from threading import Thread
from taipy.gui import Gui, State, invoke_callback, get_state_id

HOST = "127.0.0.1"
PORT = 65432


# Socket handler
def client_handler(gui: Gui, state_id_list: list):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((HOST, PORT))
    s.listen()
    conn, addr = s.accept()
    while True:
        if data := conn.recv(1024):
            print("Data received: ", data.decode())
            if hasattr(gui, "_server") and state_id_list:
                invoke_callback(
                    gui, state_id_list[0], update_case_count, (int(data.decode()),)
                )
        else:
            print("Connection closed")
            break


# Gui declaration
state_id_list = []

Gui.add_shared_variable("case_count")


def on_init(state: State):
    state_id = get_state_id(state)
    if (state_id := get_state_id(state)) is not None and state_id != "":
        state_id_list.append(state_id)


def update_case_count(state: State, val):
    state.case_count = val
    # state.broadcast("case_count", val)


case_count = 0

md = """
# Covid Tracker

Number of cases: <|{case_count}|>
"""
gui = Gui(page=md)

t = Thread(
    target=client_handler,
    args=(
        gui,
        state_id_list,
    ),
)
t.start()
gui.run(run_browser=False)

第3步: 在另一个向应用程序发送信息的终端中同时运行此脚本。发送的信息应该实时显示在应用程序上

# echo-client.py

from random import randint
import time
import socket

HOST = "127.0.0.1"
PORT = 65432

random_number = randint(1, 100000)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    while True:
        s.sendall(str(random_number).encode())
        random_number += randint(0, 50)
        print(random_number)
        time.sleep(5)

请告诉我这是否有帮助。请随时与我们联系,了解有关您问题的更多背景信息,尤其是代码。

© www.soinside.com 2019 - 2024. All rights reserved.