我有一个小型 GUI 测试,带有“开始”按钮和进度栏。期望的行为是:
观察到的行为是“开始”按钮冻结 5 秒,然后显示进度条(无振荡)。
这是迄今为止我的代码:
class GUI:
def __init__(self, master):
self.master = master
self.test_button = Button(self.master, command=self.tb_click)
self.test_button.configure(
text="Start", background="Grey",
padx=50
)
self.test_button.pack(side=TOP)
def progress(self):
self.prog_bar = ttk.Progressbar(
self.master, orient="horizontal",
length=200, mode="indeterminate"
)
self.prog_bar.pack(side=TOP)
def tb_click(self):
self.progress()
self.prog_bar.start()
# Simulate long running process
t = threading.Thread(target=time.sleep, args=(5,))
t.start()
t.join()
self.prog_bar.stop()
root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()
根据 Bryan Oakley here 提供的信息,我知道我需要使用线程。我尝试创建一个线程,但我猜测由于该线程是从主线程内启动的,所以它没有帮助。
我的想法是将逻辑部分放在不同的类中,并从该类中实例化 GUI,类似于 A. Rodas here.
的示例代码我的问题:
我不知道如何编码以便执行此命令:
self.test_button = Button(self.master, command=self.tb_click)
调用位于另一个类中的函数。这是一件坏事还是有可能?我将如何创建一个可以处理 self.tb_click 的第二类?我尝试按照 A. Rodas 的示例代码进行操作,效果非常好。但我无法弄清楚如何在触发操作的 Button 小部件的情况下实现他的解决方案。
如果我应该从单个 GUI 类中处理线程,那么如何创建一个不干扰主线程的线程?
当您在主线程中加入新线程时,它将等待直到该线程完成,因此即使您使用多线程,GUI也会阻塞。
如果你想将逻辑部分放在不同的类中,你可以直接子类化Thread,然后在按下按钮时启动该类的一个新对象。 Thread 子类的构造函数可以接收一个 Queue 对象,然后您就可以与 GUI 部分进行通信。所以我的建议是:
然后你必须解决如果用户单击同一个按钮两次会发生什么的问题(每次单击都会产生一个新线程),但是你可以通过禁用开始按钮并在调用后再次启用它来修复它
self.prog_bar.stop()
。
import queue
class GUI:
# ...
def tb_click(self):
self.progress()
self.prog_bar.start()
self.queue = queue.Queue()
ThreadedTask(self.queue).start()
self.master.after(100, self.process_queue)
def process_queue(self):
try:
msg = self.queue.get_nowait()
# Show result of the task if needed
self.prog_bar.stop()
except queue.Empty:
self.master.after(100, self.process_queue)
class ThreadedTask(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
time.sleep(5) # Simulate long running process
self.queue.put("Task finished")
我将提交替代解决方案的基础。它本身并不特定于 Tk 进度条,但它当然可以非常容易地实现。
这里有一些类,允许您在 Tk 后台运行其他任务,在需要时更新 Tk 控件,而不是锁定 gui!
这是 TkRepeatingTask 和 BackgroundTask 类:
import threading
class TkRepeatingTask():
def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
self.__tk_ = tkRoot
self.__func_ = taskFuncPointer
self.__freq_ = freqencyMillis
self.__isRunning_ = False
def isRunning( self ) : return self.__isRunning_
def start( self ) :
self.__isRunning_ = True
self.__onTimer()
def stop( self ) : self.__isRunning_ = False
def __onTimer( self ):
if self.__isRunning_ :
self.__func_()
self.__tk_.after( self.__freq_, self.__onTimer )
class BackgroundTask():
def __init__( self, taskFuncPointer ):
self.__taskFuncPointer_ = taskFuncPointer
self.__workerThread_ = None
self.__isRunning_ = False
def taskFuncPointer( self ) : return self.__taskFuncPointer_
def isRunning( self ) :
return self.__isRunning_ and self.__workerThread_.isAlive()
def start( self ):
if not self.__isRunning_ :
self.__isRunning_ = True
self.__workerThread_ = self.WorkerThread( self )
self.__workerThread_.start()
def stop( self ) : self.__isRunning_ = False
class WorkerThread( threading.Thread ):
def __init__( self, bgTask ):
threading.Thread.__init__( self )
self.__bgTask_ = bgTask
def run( self ):
try :
self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
except Exception as e: print repr(e)
self.__bgTask_.stop()
这是一个 Tk 测试,演示了这些的使用。如果您想查看实际演示,只需将其附加到包含这些类的模块底部即可:
def tkThreadingTest():
from tkinter import Tk, Label, Button, StringVar
from time import sleep
class UnitTestGUI:
def __init__( self, master ):
self.master = master
master.title( "Threading Test" )
self.testButton = Button(
self.master, text="Blocking", command=self.myLongProcess )
self.testButton.pack()
self.threadedButton = Button(
self.master, text="Threaded", command=self.onThreadedClicked )
self.threadedButton.pack()
self.cancelButton = Button(
self.master, text="Stop", command=self.onStopClicked )
self.cancelButton.pack()
self.statusLabelVar = StringVar()
self.statusLabel = Label( master, textvariable=self.statusLabelVar )
self.statusLabel.pack()
self.clickMeButton = Button(
self.master, text="Click Me", command=self.onClickMeClicked )
self.clickMeButton.pack()
self.clickCountLabelVar = StringVar()
self.clickCountLabel = Label( master, textvariable=self.clickCountLabelVar )
self.clickCountLabel.pack()
self.threadedButton = Button(
self.master, text="Timer", command=self.onTimerClicked )
self.threadedButton.pack()
self.timerCountLabelVar = StringVar()
self.timerCountLabel = Label( master, textvariable=self.timerCountLabelVar )
self.timerCountLabel.pack()
self.timerCounter_=0
self.clickCounter_=0
self.bgTask = BackgroundTask( self.myLongProcess )
self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )
def close( self ) :
print "close"
try: self.bgTask.stop()
except: pass
try: self.timer.stop()
except: pass
self.master.quit()
def onThreadedClicked( self ):
print "onThreadedClicked"
try: self.bgTask.start()
except: pass
def onTimerClicked( self ) :
print "onTimerClicked"
self.timer.start()
def onStopClicked( self ) :
print "onStopClicked"
try: self.bgTask.stop()
except: pass
try: self.timer.stop()
except: pass
def onClickMeClicked( self ):
print "onClickMeClicked"
self.clickCounter_+=1
self.clickCountLabelVar.set( str(self.clickCounter_) )
def onTimer( self ) :
print "onTimer"
self.timerCounter_+=1
self.timerCountLabelVar.set( str(self.timerCounter_) )
def myLongProcess( self, isRunningFunc=None ) :
print "starting myLongProcess"
for i in range( 1, 10 ):
try:
if not isRunningFunc() :
self.onMyLongProcessUpdate( "Stopped!" )
return
except : pass
self.onMyLongProcessUpdate( i )
sleep( 1.5 ) # simulate doing work
self.onMyLongProcessUpdate( "Done!" )
def onMyLongProcessUpdate( self, status ) :
print "Process Update: %s" % (status,)
self.statusLabelVar.set( str(status) )
root = Tk()
gui = UnitTestGUI( root )
root.protocol( "WM_DELETE_WINDOW", gui.close )
root.mainloop()
if __name__ == "__main__":
tkThreadingTest()
关于BackgroundTask,我要强调的两个要点:
1)您在后台任务中运行的函数需要采用一个函数指针,它将调用并遵守,这允许任务在中途取消 - 如果可能的话。
2)您需要确保退出应用程序时后台任务已停止。如果你不解决这个问题,即使你的图形用户界面关闭,该线程仍然会运行!
我使用了 RxPY,它有一些很好的线程函数,可以以相当干净的方式解决这个问题。没有队列,并且我提供了一个在后台线程完成后在主线程上运行的函数。这是一个工作示例:
import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk
class UI:
def __init__(self):
self.root = tk.Tk()
self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
self.button = tk.Button(text="Do Task", command=self.do_task).pack()
def do_task(self):
rx.empty().subscribe(
on_completed=self.long_running_task,
scheduler=self.pool_scheduler
)
def long_running_task(self):
# your long running task here... eg:
time.sleep(3)
# if you want a callback on the main thread:
self.root.after(5, self.on_task_complete)
def on_task_complete(self):
pass # runs on main thread
if __name__ == "__main__":
ui = UI()
ui.root.mainloop()
使用此构造的另一种方法可能更干净(取决于偏好):
tk.Button(text="Do Task", command=self.button_clicked).pack()
...
def button_clicked(self):
def do_task(_):
time.sleep(3) # runs on background thread
def on_task_done():
pass # runs on main thread
rx.just(1).subscribe(
on_next=do_task,
on_completed=lambda: self.root.after(5, on_task_done),
scheduler=self.pool_scheduler
)
问题是 t.join() 阻塞了点击事件,主线程没有返回到事件循环来处理重绘。 请参阅 为什么 ttk 进度条出现在 Tkinter 中的进程之后 或 TTK 进度条在发送电子邮件时被阻止
以下是我在 StackOverflow 上与 tkinter 和线程相关的答案列表:
在我的程序中,主线程将在其他线程启动之前结束。为此,我添加了以下内容:
while True: #Keeps MainThread active forever
time.sleep(1)
在 VSCode(和其他程序)中,您可以启用调试并运行,并检查主线程的“调用堆栈”区域。如果您在错误位置附近没有看到它,则意味着此时它未处于活动状态。尝试使用上面的代码来解决这个问题。