我有一个耗时的线程操作,但它在处理过程中无法发出进度。因此,我使用另一个线程来模拟其进度。当耗时操作完成时,它会发出结束信号,同时发出模拟过程结束的信号。然而,模拟线程中的实际函数操作不受控制。在下面的示例中,
self.thread_two_stop_signal.emit()
,但是,handle_two()
仍在运行。
import sys
import time
from PyQt6.QtCore import QObject, pyqtSignal, QThread
from PyQt6.QtWidgets import QApplication, QMainWindow, QProgressBar, QPushButton
class ThreadOne(QObject):
done_signal = pyqtSignal()
finished_signal = pyqtSignal()
def __init__(self):
super().__init__()
def run(self):
for i in range(100):
time.sleep(0.01)
self.done_signal.emit()
def finished(self):
self.finished_signal.emit()
class ThreadTwo(QObject):
finished_signal = pyqtSignal()
progress_signal = pyqtSignal(int)
def __init__(self):
self.if_finished = False
super().__init__()
def run(self):
i = 0
while True:
if self.if_finished or i == 99:
self.progress_signal.emit(i)
return
i += 1
self.progress_signal.emit(i)
time.sleep(0.1)
def finished(self):
self.finished_signal.emit()
def reset(self):
self.if_finished = False
def stop(self):
print("stop")
self.if_finished = True
class MainWindow(QMainWindow):
thread_one_do_signal = pyqtSignal()
thread_one_finished_signal = pyqtSignal()
thread_two_do_signal = pyqtSignal()
thread_two_reset_signal = pyqtSignal()
thread_two_stop_signal = pyqtSignal()
thread_two_finished_signal = pyqtSignal()
def __init__(self):
super().__init__()
self.setWindowTitle("My PyQt6 App")
self.setGeometry(100, 100, 400, 200)
self.btn = QPushButton("Start", self)
self.btn.setGeometry(150, 25, 50, 50)
self.bar = QProgressBar(self)
self.bar.setGeometry(50, 100, 300, 20)
self.btn.clicked.connect(self.start)
self.proxy_thread_one = QThread()
self.thread_one = ThreadOne()
self.thread_one.moveToThread(self.proxy_thread_one)
self.thread_one_do_signal.connect(self.thread_one.run)
self.thread_one_finished_signal.connect(self.thread_one.finished)
self.thread_one.done_signal.connect(self.handle_one)
self.thread_one.finished_signal.connect(self.proxy_thread_one.quit)
self.thread_one.finished_signal.connect(self.proxy_thread_one.wait)
self.proxy_thread_two = QThread()
self.thread_two = ThreadTwo()
self.thread_two.moveToThread(self.proxy_thread_two)
self.thread_two_do_signal.connect(self.thread_two.run)
self.thread_two_stop_signal.connect(self.thread_two.stop)
self.thread_two_reset_signal.connect(self.thread_two.reset)
self.thread_two_finished_signal.connect(self.thread_two.finished)
self.thread_two.progress_signal.connect(self.handle_two)
self.thread_two.finished_signal.connect(self.proxy_thread_two.quit)
self.thread_two.finished_signal.connect(self.proxy_thread_two.wait)
def start(self):
self.proxy_thread_one.start()
self.proxy_thread_two.start()
self.thread_one_do_signal.emit()
self.thread_two_reset_signal.emit()
self.thread_two_do_signal.emit()
def handle_one(self):
self.thread_two_stop_signal.emit()
self.thread_two_finished_signal.emit()
self.thread_one_finished_signal.emit()
self.bar.setValue(100)
def handle_two(self, value):
self.bar.setValue(value)
if __name__ == "__main__":
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
sys.exit(app.exec())
断开信号后,它开始工作。
def handle_one(self):
self.thread_two.progress_signal.disconnect(self.handle_two)
self.thread_two_stop_signal.emit()
self.thread_two_finished_signal.emit()
self.thread_one_finished_signal.emit()
self.bar.setValue(100)
我想了解为什么我之前的方法不起作用以及是否有解决方案。
另外,我想询问一下我在PyQt6中对线程的使用情况。我的做法正确吗?能优雅退出吗?是否存在潜在的线程安全问题?
该示例的主要问题是它在每个线程内使用阻塞循环,这将阻止立即处理线程本地事件。
当跨线程发出信号时,事件将被发送到接收线程的事件循环。但是,如果在接收工作线程中执行阻塞循环,它将以与主 GUI 线程中完全相同的方式冻结事件处理。因此必须采取措施明确强制处理此类线程本地事件。实现此目的最简单的方法是调用
QApplication.processEvents()
,如下所示:
class ThreadTwo(QObject):
...
def run(self):
i = 0
while True:
QApplication.processEvents()
if self.if_finished or i == 99:
self.progress_signal.emit(i)
return
...
至于示例中线程是否“正确”使用的更普遍的问题:这很大程度上是一个意见/品味的问题。通过删除大部分自定义信号并直接调用
stop()
可以完全绕过上述问题。严格来说,这意味着修改 if_finished
属性不再是线程安全的 - 但考虑到只有一个线程需要读取 if_finished
的值,这不会对代码的可靠性产生明显的影响。事实上,可以说,由此产生的简化将使代码更易于理解和维护,因此从这个意义上说,“定性”更可靠。要了解此类代码的外观,请尝试下面重写的示例:
import sys, random
from PyQt6.QtCore import QObject, pyqtSignal, QThread
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QProgressBar, QPushButton,
QWidget, QHBoxLayout,
)
class WorkerOne(QObject):
finished = pyqtSignal()
def run(self):
delay = random.randint(25, 50)
for i in range(100):
QThread.msleep(delay)
self.finished.emit()
class WorkerTwo(QObject):
progress = pyqtSignal(int)
def run(self):
self._stopped = False
for i in range(1, 101):
QThread.msleep(50)
if not self._stopped:
self.progress.emit(i)
else:
self.progress.emit(100)
break
def stop(self):
print('stop')
self._stopped = True
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("My PyQt6 App")
self.setGeometry(600, 200, 400, 50)
widget = QWidget()
layout = QHBoxLayout(widget)
self.btn = QPushButton("Start")
self.bar = QProgressBar()
layout.addWidget(self.bar)
layout.addWidget(self.btn)
self.setCentralWidget(widget)
self.btn.clicked.connect(self.start)
self.thread_one = QThread()
self.worker_one = WorkerOne()
self.worker_one.moveToThread(self.thread_one)
self.thread_one.started.connect(self.worker_one.run)
self.worker_one.finished.connect(self.handle_finished)
self.thread_two = QThread()
self.worker_two = WorkerTwo()
self.worker_two.moveToThread(self.thread_two)
self.thread_two.started.connect(self.worker_two.run)
self.worker_two.progress.connect(self.bar.setValue)
def start(self):
if not (self.thread_one.isRunning() or
self.thread_two.isRunning()):
self.thread_one.start()
self.thread_two.start()
def handle_finished(self):
self.worker_two.stop()
self.reset()
def reset(self):
self.thread_one.quit()
self.thread_two.quit()
self.thread_one.wait()
self.thread_two.wait()
def closeEvent(self, event):
self.reset()
if __name__ == "__main__":
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
sys.exit(app.exec())