我如何使用PyQt线程长时间睡眠?

问题描述 投票:0回答:1

我有很多特定的对象,它们需要一次又一次地以不断变化的间隔运行特定的功能,直到他们决定完成为止。

例如,一个对象可能需要等待30秒,运行,等待60秒,运行,等待10秒,运行...您明白了,对于30-120个不同的对象,运行完全相同的功能。

我当时以为,仅具有一个在特定时间间隔内休眠的函数可以解决我的问题,但是,如果我错了,请纠正我,我记得线程池在任何给定时间只能运行一定数量的线程。 (对我来说是12)。 如何克服这个限制?

class Thing(object):
    def getCurrentPeriod(self):
        return random.randint(5, 30) # Some ever changing period of time

    def refresh(self):
        doThings() # A long running task that is disk and network intensive

    def waitRefresh(self):
        period = self.getCurrentPeriod()
        time.sleep(period) # Wait that period out
        self.refresh()
        return self.needRefresh()
        # Boolean if it needs to restart - Not sure about how to reschedule,  
        # or specifically where to connect the worker emit when it finishes 
        # to make sure this *specific* Thing obj gets it's waitRefresh func called again.

class App(QMainWindow):
    def __init__(self, *args, **kwargs):
    super(MainWindow, self).__init__(*args, **kwargs)

    self.threadpool = QThreadPool()

    # Add initial objects to pool (other portions of app may add more over time)
    for thing in self.acquireThings():
        worker = Worker(thing.waitRefresh)
        self.threadpool.start(worker)

[不包括WorkerSignals类或QRunnable子类,this示例包括我通常要做的事情。该示例正在解决相同的问题,但以一种(最有可能的)低效率的方式。

edit:New example,其中包含time.sleep如何不暂停线程并允许其他人工作的完整工作示例。我认为async可能是唯一的实现,但是是否有快速解决方案,所以我不必更改整个应用程序?

Here's what it looks like when you try to sleep more than 12 threads.

multithreading pyqt pyqt5 sleep
1个回答
0
投票

最终的解决方案是当我决定实际尝试QTimer类时出现的。也许还有更多优化的解决方案,但是尽管它非常简单,但它似乎打遍了所有复选框。

import random
import time
import traceback

from functools import partial
from PyQt5.QtCore import *
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import *


class WorkerSignals(QObject):
    """
    Represents the signals a Worker can emit.
    """
    finished = pyqtSignal()
    starting = pyqtSignal(int) # ID of thread
    result = pyqtSignal(tuple) # Tuple refresh result, result and ID

class Worker(QRunnable):
    """
    A worker designed to tell when it's starting, when it's finished and the result.
    Designed to work around Thread.refresh().
    """

    def __init__(self, fn, thread_id, *args, **kwargs):
        super(Worker, self).__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.id = thread_id
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        """
        Runs a given method, and emits the result with the Worker's coordinated ID.
        """
        try:
            self.signals.starting.emit(self.id) # Thread is now finally ready to work.
            result = self.fn(*self.args, **self.kwargs) # Refresh Thread!
            self.signals.result.emit(result) # Thread is finished, emit result tuple.
        except:
            traceback.print_exc()
        finally:
            self.signals.finished.emit()  # Done


class Thread(object):
    """
    Basic Rules for a Thread Object:
    Cannot store the next timestamp on the object (it's a database object, I don't believe it's good practice
    to be creating sessions over and over to simply read/write the access time.
    ID and Active are allowed as booleans.
    """
    i = -1

    def __init__(self):
        self.id = Thread.nextID()
        self.active = True
        self.refreshes = 0

    def refresh(self) -> tuple:
        """
        'Refreshes' a thread. Waits a specific period, then decides whether Thread object should be deactivated or
        returned from additional refreshes. Chance of deactivation lowers with each refresh.
        :return: The refresh result, a tuple with a boolean and the thread's ID (for identifying it later)
        """

        # Represents my SQL Alchemy Model's refresh() function
        self.refreshes += 1
        time.sleep(random.randint(2, 5))
        if random.random() <= max(0.1, 1.0 - ((self.refreshes + 5) / 10)):
            self.active = False
        return self.active, self.id

    @staticmethod
    def getRefreshTime() -> float:
        """
        Represents the amount of time before a thread should be refreshed.
        Should NOT be used to determine whether the thread is still active or not.

        :return: The time period that should be waited.
        """

        return random.uniform(10, 300)

    @staticmethod
    def nextID() -> int:
        """
        Returns integer thread IDs in sequence to remove possibility of duplicate IDs.
        :return: Integer Thread ID
        """
        Thread.i += 1
        return Thread.i

    def __repr__(self):
        return f'Thread(id={self.id} active={self.active})'


class MainWindow(QMainWindow):
    """
    GUI containing a Label, Button and ListWidget showing all the active sleeping/working threads.
    Manages a threadpool, a number of background singleshot timers, etc.
    """

    def __init__(self, *args, **kwargs):
        super(MainWindow, self).__init__(*args, **kwargs)

        # Widgets Setup
        layout = QVBoxLayout()
        self.list = QListWidget()
        self.l = QLabel("Total Active: 0")
        self.button = QPushButton("Refresh List")
        self.button.pressed.connect(self.refreshList)
        self.button.setDisabled(True)
        layout.addWidget(self.l)
        layout.addWidget(self.button)
        layout.addWidget(self.list)
        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)
        self.show()

        # Periodically add threads to the pool.
        self.poolTimer = QTimer()
        self.poolTimer.setInterval(5_000)
        self.poolTimer.timeout.connect(self.addThreads)

        # Threading Setup
        self.threadpool = QThreadPool()
        print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

        self.active, self.threads = {}, {}
        # Add a number of threads to start with.
        for _ in range(random.randint(5, 16)):
            self.setupThread(Thread())
        self.poolTimer.start()

    def refreshList(self):
        """
        Refreshes the ListWidget in the GUI with all the active/sleeping/working threads.
        """
        self.list.clear()
        bold = QFont()
        bold.setBold(True)

        active = 0
        for thread in self.threads.values():
            item = QListWidgetItem(
                f'Thread {thread.id}/{thread.refreshes}')
            # Bold a thread if it's working
            if self.active[thread.id]:
                active += 1
                item.setFont(bold)
            self.list.addItem(item)
        self.l.setText(f'Total Active: {active}/{len(self.threads)}')

    def refreshResult(self, result) -> None:
        """
        When a thread is finished, the result determines it's next course of action, which is either
        to return to the pool again, or delete itself.

        :param result: A tuple containing the result (bool) and the connected Thread ID.
        """
        self.active[result[1]] = False
        if result[0]:
            print(f'Restarting Thread {result[1]}')
            self.setupThread(self.threads[result[1]]) # Add by ID, which would normally be a database GET
        else:
            print(f'Thread {result[1]} shutting down.')
            del self.active[result[1]]
            del self.threads[result[1]]
        self.refreshList()

    def updateActivity(self, thread_id) -> None:
        """
        Connected to the starting signal, helps signal when a thread is actually being refreshed.

        :param thread_id: The Thread ID
        """
        print(f'Thread {thread_id} is now active/working.')
        self.active[thread_id] = True

    def refresh(self, thread):
        """
        Adds a new worker to the threadpool to be refreshed.
        Can't be considered a real start to the thread.refresh function, as the pool has a max of 12 workers at any time.
        The 'starting' signal can tell us when a specific thread is actually being refreshed, and is represented
        as a Bold element in the list.

        :param thread: A thread instance.
        """
        print(f'Adding Thread {thread.id} to the pool.')
        worker = Worker(thread.refresh, thread_id=thread.id)
        worker.signals.result.connect(self.refreshResult)
        worker.signals.starting.connect(self.updateActivity)
        self.threadpool.start(worker)
        # self.active[thread.id] = True
        self.refreshList()

    def setupThread(self, thread) -> None:
        """
        Adds a new timer designated to start a specific thread.
        :param thread: A thread instance.
        """
        self.active[thread.id] = False
        self.threads[thread.id] = thread
        t = QTimer()
        period = thread.getRefreshTime()
        t.singleShot(period * 1000, partial(self.refresh, thread=thread))
        print(f'Thread {thread.id} will start in {period} seconds.')
        self.refreshList()

    def addThreads(self):
        """
        Adds a number of threads to the pool. Called automatically every couple seconds.
        """

        add = max(0, 30 + random.randint(-5, 5) - len(self.threads))
        if add > 0:
            print(f'Adding {add} thread{"s" if add > 1 else ""}.')
            for _ in range(add):
                self.setupThread(Thread())

app = QApplication([])
window = MainWindow()
app.exec_()

当请求线程时,将创建一个计时器,并在将其添加到线程池的附加功能上触发singleShot。该线程池最多可以处理12个刷新的连续“刷新”线程,并且信号允许GUI在发现更改后立即进行更新。

成千上万的“线程”对象可能正在等待,似乎singleShot能够将它们准确地添加到池中。

[信号有助于区分线程是sleepingworkingactive的时间(但是inactive线程对象将立即删除)。

我可以想到的与此程序有关的唯一警告是:

1)QThread实现可以胜过它吗?

2] QTimer函数执行并触发后,会发生什么?是否会对其进行适当的GC处理,或者可以在后台消耗大量资源来建立数千个?]

© www.soinside.com 2019 - 2024. All rights reserved.