当我尝试从进程发送绘图数据时出现多处理断管错误

问题描述 投票:0回答:1

我制作了一个 pyqt5 应用程序来测试麦克风。我在这个范围内使用了 pydub 和 pyaudio 模块。我还使用 matplotlib 绘制麦克风数据。我有一个 QDialog,它运行一个发射器与 qdialog 进行通信,并进行多处理以从 pyaudio 输入流中读取数据。当我从用户界面中选择标准化麦克风声音时,当我不说话时就会出现噪音。同样在这种情况下,几分钟后应用程序崩溃并出现以下错误:

Traceback (most recent call last):
  File "C:\Users\chris\Documents\My Projects\papinhio-player\src\python+\main-window\../..\python+\menu-1\manage-input-and-output-sound-devices\microphone-input-device-settings\microphone-input-device-setting.py", line 866, in run
    self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
  File "C:\Python\Lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Python\Lib\multiprocessing\connection.py", line 301, in _send_bytes
    nwritten, err = ov.GetOverlappedResult(True)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 109] Η διοχέτευση έχει τερματιστεί

Process Manage_Input_Device_Child_Proc-1:
Traceback (most recent call last):
  File "C:\Users\chris\Documents\My Projects\papinhio-player\src\python+\main-window\../..\python+\menu-1\manage-input-and-output-sound-devices\microphone-input-device-settings\microphone-input-device-setting.py", line 866, in run
    self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
  File "C:\Python\Lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Python\Lib\multiprocessing\connection.py", line 301, in _send_bytes
    nwritten, err = ov.GetOverlappedResult(True)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 109] Η διοχέτευση έχει τερματιστεί

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python\Lib\multiprocessing\process.py", line 314, in _bootstrap
    self.run()
  File "C:\Users\chris\Documents\My Projects\papinhio-player\src\python+\main-window\../..\python+\menu-1\manage-input-and-output-sound-devices\microphone-input-device-settings\microphone-input-device-setting.py", line 875, in run
    self.to_emitter.send({"type":"error","error_message":error_message})
  File "C:\Python\Lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Python\Lib\multiprocessing\connection.py", line 289, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 232] Η διοχέτευση κλείνει

Process finished with exit code -1073741571 (0xC00000FD)

该错误与绘图数据有关。如果我评论这一行:

self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
,那么就不会出现应用程序崩溃。

相关代码:

class Manage_Input_Device_Emitter(QThread):
    try:
        plot_data_signal = pyqtSignal(list,float)
        save_finished = pyqtSignal()
        devices_settings = pyqtSignal(list,int,float,float,float,float,float)
        error_signal = pyqtSignal(str)
    except:
        pass

    def __init__(self, from_process: Pipe):
        try:
            super().__init__()
            self.data_from_process = from_process
        except:
            pass

    def run(self):
        try:
            while True:
                data = self.data_from_process.recv()
                if data["type"]=="plot_data":
                    self.plot_data_signal.emit(data["plot_data"],data["normalized_value"])
                elif data["type"]=="save_finished":
                    self.save_finished.emit()
                elif data["type"]=="available_devices":
                    self.devices_settings.emit(data["devices"],data["device_index"],data["volume"],data["is_normalized"],data["pan"],data["low frequency"],data["high frequency"])
                elif data["type"] == "error":
                    self.error_signal.emit(data["error_message"])
        except:
            error_message = traceback.format_exc()
            self.error_signal.emit(error_message)

class Manage_Input_Device_Child_Proc(Process):

    def __init__(self, to_emitter, from_mother):
        try:
            super().__init__()
            self.daemon = False
            self.to_emitter = to_emitter
            self.data_from_mother = from_mother

            #local argument(s) save

        except:
            try:
                error_message = str(traceback.format_exc())
                to_emitter.send({"type":"error","error_message":error_message})
            except:
                pass

    def run(self):
        try:
            self.fetch_input_settings()


            self.bit_rate = 128*1024 #128 kb/sec
            self.packet_time = 125 #125 msec
            #self.packet_time = 125*44100/32768
            self.packet_size = int(16384/4)
            #self.new_sample_rate = 32768
            self.new_sample_rate = 44100
            self.TIME_WINDOW = 3000


            self.format = pyaudio.paInt16
            self.channels = 2

            self.input_stream = None
            self.output_stream = None
            self.play_status = "stopped"
            self.process_terminated = False

            while(self.process_terminated == False):
                if self.play_status == "stopped":
                    data = self.data_from_mother.get()
                else:
                    q_size = self.data_from_mother.qsize()
                    if q_size>0:
                        data = self.data_from_mother.get()
                    else:
                        data = None
                if data is not None:
                    if data["type"] == "stop-process":
                        self.process_terminated = True
                        return 1
                    if data["type"] == "save":
                        device_name = data["device_name"]
                        volume = data["volume"]
                        is_normalized = data["is_normalized"]
                        pan = data["pan"]
                        low_frequency = data["low_frequency"]
                        high_frequency = data["high_frequency"]
                        self.save(device_name,volume,is_normalized,pan,low_frequency,high_frequency)
                        break
                    elif data["type"] == "test":
                        self.output_stream = self.p.open(format=pyaudio.paInt16,channels=self.channels,rate=self.new_sample_rate,output=True,output_device_index=self.output_device_index,frames_per_buffer=self.packet_size)

                        #self.output_stream = self.p.open(format=pyaudio.paInt16,channels=self.channels,rate=self.new_sample_rate,output=True,frames_per_buffer=self.packet_size)
                        self.output_stream.start_stream()
                        self.input_device_name = data["content"]
                        for input_device in self.input_devices:
                            if(data["content"]==input_device[2]):
                                self.input_device_index = input_device[1]

                                self.input_stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
                                self.input_stream.start_stream()
                                self.input_channels = 1

                                '''
                                try:
                                    self.input_stream = self.p.open(format=pyaudio.paInt16,channels=self.channels,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
                                    self.input_stream.start_stream()
                                    self.input_channels = self.channels
                                except Exception as e:
                                    #self.input_stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
                                    self.input_stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
                                    self.input_stream.start_stream()
                                    self.input_channels = 1
                                '''
                                self.play_status = "playing"
                                self.chunk_number = 0
                                self.current_duration_milliseconds = 0
                                self.now = datetime.now()
                                self.x_vals = np.array([])
                                self.y_vals = np.array([])
                    elif data["type"] == "stop":
                        self.play_status = "stopped"
                        self.chunk_number = 0
                        self.current_duration_milliseconds = 0
                        try:
                            self.output_stream.stop_stream()
                            self.output_stream.close()
                            self.input_stream.stop_stream()
                            self.input_stream.close()
                        except:
                            pass
                        self.now = datetime.now()
                        self.x_vals = np.array([])
                        self.y_vals = np.array([])
                    elif data["type"] == "volume":
                        self.volume = data["value_base_100"]
                    elif data["type"] == "is_normalized":
                        self.is_normalized = data["boolean_value"]
                    elif data["type"] == "pan":
                        self.pan = data["pan_value"]
                    elif data["type"] == "low_frequency":
                        self.low_frequency = data["low_frequency_value"]
                    elif data["type"] == "high frequency":
                        self.high_frequency = data["high_frequency_value"]

                if self.play_status=="playing":
                    in_data = self.input_stream.read(self.packet_size,exception_on_overflow = False)

                    if self.input_channels == 2:
                        slice = AudioSegment(in_data, sample_width=2, frame_rate=self.new_sample_rate, channels=2)
                    else:
                        slice = AudioSegment(in_data, sample_width=2, frame_rate=self.new_sample_rate, channels=1)
                        slice = AudioSegment.from_mono_audiosegments(slice, slice)

                    if self.pan!=0:
                        slice = slice.pan(self.pan/100)
                    if self.low_frequency>20:
                        slice = effects.high_pass_filter(slice, self.low_frequency)
                    if self.high_frequency>20000:
                        slice = effects.low_pass_filter(slice, self.high_frequency)
                    if(self.volume==0):
                        db_volume = -200
                    else:
                        db_volume =  20*math.log10(self.volume/100)
                    slice = slice+db_volume
                    if self.is_normalized:
                        slice = self.normalize_method(slice,0.1)
                    self.output_stream.write(slice.raw_data)


                    free = self.output_stream.get_write_available()
                    if free > self.packet_size: # Is there a lot of space in the buffer?
                        tofill = free - self.packet_size
                        silence = chr(0)*tofill*self.channels*2
                        self.output_stream.write(silence) # Fill it with silence
                        #free = self.output_stream.get_write_available()
                        #print(free)



                    chunk_time = len(slice)
                    samples = slice.get_array_of_samples()
                    left_samples = samples[::2]
                    right_samples = samples[1::2]
                    left_audio_data = np.frombuffer(left_samples, np.int16)[::16] #down sampling
                    right_audio_data = np.frombuffer(right_samples, np.int16)[::16] #down sampling
                    audio_data = np.vstack((left_audio_data,right_audio_data)).ravel('F')

                    time_data = np.array([])
                    for i in range(0,len(audio_data)):
                        time_data = np.append(time_data, self.now)
                        self.now = self.now+timedelta(milliseconds=chunk_time/len(audio_data))

                    self.x_vals = np.concatenate((self.x_vals, time_data))
                    self.y_vals = np.concatenate((self.y_vals, audio_data))

                    if(self.x_vals.size>audio_data.size*(self.TIME_WINDOW/chunk_time)):
                        self.x_vals = self.x_vals[audio_data.size:]
                        self.y_vals = self.y_vals[audio_data.size:]

                    average_data_value = slice.max
                    normalized_value = abs(average_data_value)/slice.max_possible_amplitude
                    if normalized_value>1:
                        normalized_value = 1
                    if self.play_status == "stopped":
                        normalized_value = 0

                    self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})

                    self.now = datetime.now()

                    self.chunk_number += 1
                    self.current_duration_milliseconds += chunk_time
        except:
            error_message = str(traceback.format_exc())
            print(error_message)
            self.to_emitter.send({"type":"error","error_message":error_message})

    def normalize_method(self,seg, headroom):
        try:
            peak_sample_val = seg.max

            # if the max is 0, this audio segment is silent, and can't be normalized
            if peak_sample_val == 0:
                return seg

            target_peak = seg.max_possible_amplitude * utils.db_to_float(-headroom)
            #target_peak = seg.max_possible_amplitude * (percent_headroom)

            needed_boost = utils.ratio_to_db(target_peak / peak_sample_val)
            return seg.apply_gain(needed_boost)
        except:
            error_message = traceback.format_exc()
            self.to_emitter.send({"type":"error","error_message":error_message})
            return seg
python pyqt5 multiprocessing pipe
1个回答
0
投票

我将QThread修改为:

                if self.data_from_process.poll():
                    data = self.data_from_process.recv()
                else:
                    time.sleep(0.1)
                    continue
© www.soinside.com 2019 - 2024. All rights reserved.