PyQt5 - 将多重处理用于 matplotlib 目的

问题描述 投票:0回答:1

使用这段代码,我试图绘制一个持续时间=125毫秒的 pydub.AudioSegment 信号。 绘图时间窗口(x 轴)为 3 秒。

代码:

import time
from PyQt5.QtCore import pyqtSignal, QThread
from multiprocessing import Process, Queue, Pipe
from datetime import datetime, timedelta
import traceback
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.dates import num2date
from matplotlib.ticker import FuncFormatter
import numpy as np


class Final_Slice_Plot:

    def __init__(self, main_self):
        self.main_self = main_self

        # chart
        self.chart = Canvas(self)
        self.chart.ax.set_facecolor((1, 1, 1))
        self.chart.ax.tick_params(labelcolor='white')

        # create process
        self.process_number = 94
        self.final_slice_plot_mother_pipe, self.final_slice_plot_child_pipe = Pipe()
        self.final_slice_plot_queue = Queue()
        self.final_slice_plot_emitter = Final_Slice_Plot_Emitter(self.final_slice_plot_mother_pipe)
        self.final_slice_plot_emitter.error_signal.connect(lambda error_message: print(error_message))
        self.final_slice_plot_emitter.plot_data_signal.connect(lambda plot_data: self.plot(plot_data))

        self.final_slice_plot_emitter.start()
        self.final_slice_plot_child_process = Final_Slice_Plot_Child_Proc(self.final_slice_plot_child_pipe, self.final_slice_plot_queue)
        self.final_slice_plot_child_process.start()

        counter = 0
        for process in self.main_self.manage_processes_instance.processes:
            if "process_number" in process:
                if process["process_number"] == self.process_number:
                    self.main_self.manage_processes_instance.processes[counter][
                        "pid"] = self.final_slice_plot_child_process.pid
                    self.main_self.manage_processes_instance.processes[counter]["start_datetime"] = datetime.now()
                    self.main_self.manage_processes_instance.processes[counter]["status"] = "in_progress"
            counter += 1

        if self.main_self.manage_proccesses_window_is_open:
            self.main_self.manage_proccesses_window_support_code.manage_proccesses_queue.put(
                {"type": "table-update", "processes": self.main_self.manage_processes_instance.processes})

    def close(self):
        try:
            try:
                if self.final_slice_plot_child_process is not None:
                    self.final_slice_plot_child_process.terminate()
            except:
                print(traceback.format_exc())
            try:
                if self.final_slice_plot_emitter is not None:
                    self.final_slice_plot_emitter.terminate()
            except:
                print(traceback.format_exc())

            counter = 0
            for process in self.main_self.manage_processes_instance.processes:
                if "process_number" in process:
                    if process["process_number"] == self.process_number:
                        self.main_self.manage_processes_instance.processes[counter]["pid"] = None
                        self.main_self.manage_processes_instance.processes[counter]["start_datetime"] = None
                        self.main_self.manage_processes_instance.processes[counter]["status"] = "stopped"
                        self.main_self.manage_processes_instance.processes[counter]["cpu"] = 0
                        self.main_self.manage_processes_instance.processes[counter]["ram"] = 0
                counter += 1
            if self.main_self.manage_proccesses_window_is_open:
                self.main_self.manage_proccesses_window_support_code.manage_proccesses_queue.put(
                    {"type": "table-update", "processes": self.main_self.manage_processes_instance.processes})
            self.clear_plot()
        except:
            error_message = traceback.format_exc()
            print(error_message)


    # signal for plot_data_signal
    def plot(self, plot_data):
        try:
            x_vals = plot_data[0]
            y_vals = plot_data[1]

            self.chart.li.set_xdata(x_vals)
            self.chart.li.set_ydata(y_vals)

            x_ticks = []
            if (len(x_vals) > 0):
                for i in range(499, 2500 + 1, 1000):
                    tick = x_vals[0] + timedelta(milliseconds=i)
                    x_ticks.append(tick)
                plt.xticks(x_ticks)

            self.chart.ax.set_xlim(x_vals[0], x_vals[0] + timedelta(milliseconds=3000))
            self.chart.ax.xaxis.set_major_formatter(FuncFormatter(self.date_formatter_1))

            self.chart.fig.canvas.draw()
            self.chart.fig.canvas.flush_events()

        except:
            error_message = str(traceback.format_exc())
            print(error_message)


    # Clear the matplotlib plot
    def clear_plot(self):
        try:
            x_vals = [datetime.now()]
            y_vals = [0]

            self.chart.li.set_xdata(x_vals)
            self.chart.li.set_ydata(y_vals)

            x_ticks = []
            if (len(x_vals) > 0):
                for i in range(499, 2500 + 1, 1000):
                    tick = x_vals[0] + timedelta(milliseconds=i)
                    x_ticks.append(tick)
                plt.xticks(x_ticks)

                self.chart.ax.set_xlim(x_vals[0], x_vals[0] + timedelta(milliseconds=3000))

            self.chart.ax.xaxis.set_major_formatter(FuncFormatter(self.date_formatter_1))

            self.chart.fig.canvas.draw()
            self.chart.fig.canvas.flush_events()

        except Exception as e:
            error_message = str(traceback.format_exc())
            print(error_message)

    # Formats the x-axis of matplotlib plot
    def date_formatter_1(self, a, b):
        try:
            t = num2date(a)
            ms = str(t.microsecond)[:1]
            res = f"{t.hour:02}:{t.minute:02}:{t.second:02}.{ms}"
            # res = f"{t.hour:02}:{t.minute:02}:{t.second:02}"
            return res
        except Exception as e:
            error_message = str(traceback.format_exc())
            print(error_message)

class Final_Slice_Plot_Emitter(QThread):
    try:
        error_signal = pyqtSignal(str)
        plot_data_signal = pyqtSignal(list)
    except:
        pass

    def __init__(self, from_process: Pipe):
        try:
            super().__init__()
            self.data_from_process = from_process
        except:
            pass

    def run(self):
        try:
            while True:
                '''if self.data_from_process.poll():
                    data = self.data_from_process.recv()
                else:
                    time.sleep(0.1)
                    continue
                '''
                data = self.data_from_process.recv()
                if data["type"] == "error":
                    self.error_signal.emit(data["error_message"])
                elif data["type"]=="plot_data":
                    self.plot_data_signal.emit(data["plot_data"])
        except:
            error_message = traceback.format_exc()
            self.error_signal.emit(error_message)

class Final_Slice_Plot_Child_Proc(Process):

    def __init__(self, to_emitter, from_mother):
        try:
            super().__init__()
            self.daemon = False
            self.to_emitter = to_emitter
            self.data_from_mother = from_mother

        except:
            try:
                error_message = str(traceback.format_exc())
                to_emitter.send({"type": "error", "error_message": error_message})
            except:
                pass

    def run(self):
        try:
            self.TIME_WINDOW = 3000
            self.chunk_number = 0
            self.current_duration_milliseconds = 0
            self.now = datetime.now()
            self.x_vals = np.array([])
            self.y_vals = np.array([])
            while(True):
                data = self.data_from_mother.get()
                if data["type"] == "slice":
                    slice = data["slice"]

                chunk_time = len(slice)
                samples = slice.get_array_of_samples()
                left_samples = samples[::2]
                right_samples = samples[1::2]
                left_audio_data = np.frombuffer(left_samples, np.int16)[::128]  # down sampling
                right_audio_data = np.frombuffer(right_samples, np.int16)[::128]  # down sampling
                audio_data = np.vstack((left_audio_data, right_audio_data)).ravel('F')

                time_data = np.array([])
                for i in range(0, len(audio_data)):
                    time_data = np.append(time_data, self.now)
                    self.now = self.now + timedelta(milliseconds=chunk_time / len(audio_data))

                self.x_vals = np.concatenate((self.x_vals, time_data))
                self.y_vals = np.concatenate((self.y_vals, audio_data))

                if (self.x_vals.size > audio_data.size * (self.TIME_WINDOW / chunk_time)):
                    self.x_vals = self.x_vals[audio_data.size:]
                    self.y_vals = self.y_vals[audio_data.size:]

                plot_data_values = [self.x_vals, self.y_vals]
                self.to_emitter.send({"type": "plot_data", "plot_data": plot_data_values})
                self.now = datetime.now()

                self.chunk_number += 1
                self.current_duration_milliseconds += chunk_time

        except:
            error_message = str(traceback.format_exc())
            self.to_emitter.send({"type": "error", "error_message": error_message})


class Canvas(FigureCanvas):

    def __init__(self, parent):
        try:
            self.fig, self.ax = plt.subplots(figsize=(5, 4), dpi=200)
            super().__init__(self.fig)

            self.fig.patch.set_facecolor((6 / 255, 21 / 255, 154 / 255))

            self.ax.set_position([0., 0, 1., 0.8])
            self.ax.xaxis.tick_top()
            self.ax.tick_params(axis='both', which='major', pad=1, length=2.4, width=0.5, color=(1, 1, 1))

            parent.main_self.ui.verticalLayout_3.addWidget(self)

            self.now = datetime.now()
            self.chart_stop = self.now + timedelta(milliseconds=3000)

            plt.cla()

            plt.gca().xaxis.set_major_formatter(FuncFormatter(parent.date_formatter_1))

            plt.xticks(fontsize=3)
            self.ax.grid(False)

            self.ax.set_ylim(-32768, 32768)

            x_ticks = []
            for i in range(499, 2500 + 1, 1000):
                tick = self.now + timedelta(milliseconds=i)
                x_ticks.append(tick)
            plt.xticks(x_ticks)

            self.ax.set_xlim(self.now, self.now + timedelta(milliseconds=3000))


            self.li, = self.ax.plot([self.now, self.now + timedelta(milliseconds=3000)], [0, 0], color=(0, 1, 0.29),
                                    linestyle='solid', marker=",")

            self.show()
        except:
            error_message = traceback.format_exc()
            print(error_message)

错误:有时当我关闭窗口时,应用程序不会返回,直到通过 PyCharm 停止按钮(如 Ctrl+C)硬停止。

我做错了什么?

python pyqt5 multiprocessing qthread
1个回答
0
投票

当父进程死掉而没有正确终止子进程时就会卡住,解决这个问题的方法是

  1. 在输入队列上设置超时
data = self.data_from_mother.get(timeout=5)
  1. 在子进程的
    multiprocessing.Queue
    中使用
    multiprocessing.Pipe
    代替
    to_emitter
    ,因为管道满时会阻塞,而队列不会(因为它内部使用线程),这可以防止子进程无限期地卡住写入一根满了的管道,没有人从另一端读取数据。
  2. 让子进程从 queue.Empty(超时)异常中终止,这样您就不会陷入无限循环。
except queue.Empty:
    exit(0)
except Exception:
    # your logging logic here
© www.soinside.com 2019 - 2024. All rights reserved.