Python 中指定条件的多线程

问题描述 投票:0回答:1

我目前正在开发一个 Raspberry Pi 项目,其中使用模数转换器 (ADC),在我的代码中,我希望 ADC 能够连续接收信息,并用它在一个线程上接收到的数据填充一个列表,同时另一段代码正在不同的线程上运行。第二个线程正在运行一个检查磁铁检测的函数,一旦该磁铁被检测到 X 次(X 是用户定义的值),我希望代码停止运行 ADC 的线程并绘制 ADC 收到的数据。显示绘图后,此循环将不断重复,直到用户退出程序。

在我的尝试中,我在一个名为 Halleffectandadc 的类中创建了两个函数,该类中有三个函数,在这四个函数中,我希望 adreceiver() 和 Halleffect_monitor() 在两个单独的线程上运行。正如我上面提到的,我希望 adcreceiver() 和 Halleffect_monitor() 函数同时运行。我已经能够同时运行它们,当我想要存在这样一种情况时,就会出现问题:如果 Halleffect_monitor() 检测到所需的时钟周期数,它会强制 adcreceiver() 线程停止,无论如何它的大部分列表都填充了数据,并使用plot_data 函数绘制数据。我不太确定如何实现这一点,我认为使用多线程将是最好的方法,我只是不确定如何使用线程实现条件。如果有另一种方法可能更有效,我很想听听。

import spidev
import time
import numpy as np
import matplotlib.pyplot as plt
import RPi.GPIO as GPIO
import threading 

class halleffectandadc:
    def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
        self.spi = spidev.SpiDev()
        self.spi.open(0, 0)
        self.spi.max_speed_hz = clock_speed
        self.buff_size = buff_size
        self.desired_clock_cycles = desired_clock_cycles
        GPIO.setmode(GPIO.BCM)
        self.pin = pin
        GPIO.setup(pin, GPIO.IN)
        self.buff = np.ndarray((self.buff_size,), dtype="uint16")

    def get_adc_val(self):
        response = self.spi.xfer([0x00, 0x00])
        return (response[0] << 8) + response[1]

    def adcreceiver(self):
        

        self.time_start = time.perf_counter()

        i = 0

        while i < self.buff_size:
            self.buff[i] = self.get_adc_val()
            i += 1
            print(i)
            if self.event.is_set():
                    print("The thread has been stopped to process data")
                    break
        self.time_end = time.perf_counter()

    def halleffect_monitor(self):
        hall_effect_clock_count = 0
        while hall_effect_clock_count < self.desired_clock_cycles:
                print("Waiting....")
                if GPIO.wait_for_edge(self.pin, GPIO.RISING):
                        hall_effect_clock_count += 1
                        print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")       
                             
    def plot_data(self):

        elapsed_time = self.time_end - self.time_start

        time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
        self.buff[0] = self.buff[1]
        self.buff[1] = self.buff[2]
        plt.plot(time_points, self.buff / 65535 * 5)

        plt.xlabel("Elapsed Time (s)", fontsize=12)
        plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
        plt.ylabel("Voltage (V)", fontsize=12)
        plt.show()
        print(
            f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
        )

try:
    adc1 = halleffectandadc(1, 17, 22000000, 100000)
    Thread1 = threading.Thread(target=adc1.halleffect_monitor)
    Thread2 = threading.Thread(target=adc1.adcreceiver, args=(event,))
    Thread1.start()
    Thread2.start()
    
except KeyboardInterrupt:
    spi.close()


python multithreading raspberry-pi adc
1个回答
0
投票

如果我理解你的问题,你想重复收集数据并定期绘制它。我们需要在两个线程之间仔细同步,为此我们需要几个事件:

  1. self.terminate_event
    - 这是由主线程设置来告诉子线程终止。
  2. self.start_event
    -
    adcreceiver
    等待此事件以便开始接收数据。
  3. self.started_event
    - 此事件由
    adcreceiver
    设置,让
    halleffect_monitor
    知道它已开始接收数据。
  4. self.stop_event
    - 此事件由
    halleffect_monitor
    设置,以告诉
    adreceive
    在仍在接收数据的情况下停止处理。
  5. self.stopped_event
    - 此事件由
    adcreceiver
    设置,让
    halleffect_monitor
    知道它已停止处理。

这似乎有很多事件,但我相信它们是必需的,以便可以为下一个接收/绘图周期清除事件,而不必担心任何可能的竞争条件。

class halleffectandadc:
    def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
        self.terminate_event = threading.Event()
        self.start_event = threading.Event()
        self.started_event = threading.Event()
        self.stop_event = threading.Event()
        self.stopped_event = threading.Event()

        self.spi = spidev.SpiDev()
        self.spi.open(0, 0)
        self.spi.max_speed_hz = clock_speed
        self.buff_size = buff_size
        self.desired_clock_cycles = desired_clock_cycles
        GPIO.setmode(GPIO.BCM)
        self.pin = pin
        GPIO.setup(pin, GPIO.IN)

    def get_adc_val(self):
        response = self.spi.xfer([0x00, 0x00])
        return (response[0] << 8) + response[1]

    def adcreceiver(self):
        while not self.terminate_event.is_set():
            # Wait for start event:
            self.start_event.wait()
            self.start_event.clear() # clear for next time
            self.started_event.set() # show we have started

            print("The thread is starting to collect data")
            self.time_start = time.perf_counter()

            i = 0
            while i < self.buff_size and not self.stop_event.is_set():
                self.buff[i] = self.get_adc_val()
                i += 1
                print(i)

            self.time_end = time.perf_counter()

            self.stopped_event.set() # show we have stopped
            print("The thread has been stopped to process data")

    def halleffect_monitor(self):
        while not self.terminate_event.is_set():
            # start with an empty buffer for each new cycle:
            self.buff = np.ndarray((self.buff_size,), dtype="uint16")

            # clear events related to stopping
            self.stop_event.clear()
            self.stopped_event.clear()

            # start the adcreceiver thread:
            self.start_event.set()
            # wait for adcreceiver to start:
            self.started_event.wait()
            # at this point the self.start_event has been cleared by adcreceiver
            # so if adcreceiver stops because its buffer has been filled it will
            # block until self.start_event is set again

            hall_effect_clock_count = 0
            while hall_effect_clock_count < self.desired_clock_cycles:
                print("Waiting....")
                if GPIO.wait_for_edge(self.pin, GPIO.RISING):
                    hall_effect_clock_count += 1
                    print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")

            # stop the adcreceiver thread if it hasn't already stopped:
            self.stop_event.set()
            # wait for the adcreceiver thread to have stopped:
            self.stopped_event.wait()

            # plot the data:
            self.plot_data()


    def plot_data(self):
        elapsed_time = self.time_end - self.time_start

        time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
        self.buff[0] = self.buff[1]
        self.buff[1] = self.buff[2]
        plt.plot(time_points, self.buff / 65535 * 5)

        plt.xlabel("Elapsed Time (s)", fontsize=12)
        plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
        plt.ylabel("Voltage (V)", fontsize=12)
        plt.show()
        print(
            f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
        )

try:
    adc1 = halleffectandadc(1, 17, 22000000, 100000)
    t1 = threading.Thread(target=adc1.halleffect_monitor)
    t2 = threading.Thread(target=adc1.adcreceiver)
    t1.start()
    t2.start()
    input('Hit Enter to terminate ...\n')
except KeyboardInterrupt:
    pass

spi.close()
adc1.terminate_event.set()
print('Waiting for threads to finish up ...')
t1.join()
t2.join()
© www.soinside.com 2019 - 2024. All rights reserved.