我目前正在开发一个 Raspberry Pi 项目,其中使用模数转换器 (ADC),在我的代码中,我希望 ADC 能够连续接收信息,并用它在一个线程上接收到的数据填充一个列表,同时另一段代码正在不同的线程上运行。第二个线程正在运行一个检查磁铁检测的函数,一旦该磁铁被检测到 X 次(X 是用户定义的值),我希望代码停止运行 ADC 的线程并绘制 ADC 收到的数据。显示绘图后,此循环将不断重复,直到用户退出程序。
在我的尝试中,我在一个名为 Halleffectandadc 的类中创建了两个函数,该类中有三个函数,在这四个函数中,我希望 adreceiver() 和 Halleffect_monitor() 在两个单独的线程上运行。正如我上面提到的,我希望 adcreceiver() 和 Halleffect_monitor() 函数同时运行。我已经能够同时运行它们,当我想要存在这样一种情况时,就会出现问题:如果 Halleffect_monitor() 检测到所需的时钟周期数,它会强制 adcreceiver() 线程停止,无论如何它的大部分列表都填充了数据,并使用plot_data 函数绘制数据。我不太确定如何实现这一点,我认为使用多线程将是最好的方法,我只是不确定如何使用线程实现条件。如果有另一种方法可能更有效,我很想听听。
import spidev
import time
import numpy as np
import matplotlib.pyplot as plt
import RPi.GPIO as GPIO
import threading
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = clock_speed
self.buff_size = buff_size
self.desired_clock_cycles = desired_clock_cycles
GPIO.setmode(GPIO.BCM)
self.pin = pin
GPIO.setup(pin, GPIO.IN)
self.buff = np.ndarray((self.buff_size,), dtype="uint16")
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
self.time_start = time.perf_counter()
i = 0
while i < self.buff_size:
self.buff[i] = self.get_adc_val()
i += 1
print(i)
if self.event.is_set():
print("The thread has been stopped to process data")
break
self.time_end = time.perf_counter()
def halleffect_monitor(self):
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....")
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")
def plot_data(self):
elapsed_time = self.time_end - self.time_start
time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
Thread1 = threading.Thread(target=adc1.halleffect_monitor)
Thread2 = threading.Thread(target=adc1.adcreceiver, args=(event,))
Thread1.start()
Thread2.start()
except KeyboardInterrupt:
spi.close()
如果我理解你的问题,你想重复收集数据并定期绘制它。我们需要在两个线程之间仔细同步,为此我们需要几个事件:
self.terminate_event
- 这是由主线程设置来告诉子线程终止。self.start_event
- adcreceiver
等待此事件以便开始接收数据。self.started_event
- 此事件由 adcreceiver
设置,让 halleffect_monitor
知道它已开始接收数据。self.stop_event
- 此事件由 halleffect_monitor
设置,以告诉 adreceive
在仍在接收数据的情况下停止处理。self.stopped_event
- 此事件由 adcreceiver
设置,让 halleffect_monitor
知道它已停止处理。这似乎有很多事件,但我相信它们是必需的,以便可以为下一个接收/绘图周期清除事件,而不必担心任何可能的竞争条件。
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.terminate_event = threading.Event()
self.start_event = threading.Event()
self.started_event = threading.Event()
self.stop_event = threading.Event()
self.stopped_event = threading.Event()
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = clock_speed
self.buff_size = buff_size
self.desired_clock_cycles = desired_clock_cycles
GPIO.setmode(GPIO.BCM)
self.pin = pin
GPIO.setup(pin, GPIO.IN)
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
while not self.terminate_event.is_set():
# Wait for start event:
self.start_event.wait()
self.start_event.clear() # clear for next time
self.started_event.set() # show we have started
print("The thread is starting to collect data")
self.time_start = time.perf_counter()
i = 0
while i < self.buff_size and not self.stop_event.is_set():
self.buff[i] = self.get_adc_val()
i += 1
print(i)
self.time_end = time.perf_counter()
self.stopped_event.set() # show we have stopped
print("The thread has been stopped to process data")
def halleffect_monitor(self):
while not self.terminate_event.is_set():
# start with an empty buffer for each new cycle:
self.buff = np.ndarray((self.buff_size,), dtype="uint16")
# clear events related to stopping
self.stop_event.clear()
self.stopped_event.clear()
# start the adcreceiver thread:
self.start_event.set()
# wait for adcreceiver to start:
self.started_event.wait()
# at this point the self.start_event has been cleared by adcreceiver
# so if adcreceiver stops because its buffer has been filled it will
# block until self.start_event is set again
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....")
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")
# stop the adcreceiver thread if it hasn't already stopped:
self.stop_event.set()
# wait for the adcreceiver thread to have stopped:
self.stopped_event.wait()
# plot the data:
self.plot_data()
def plot_data(self):
elapsed_time = self.time_end - self.time_start
time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
t1 = threading.Thread(target=adc1.halleffect_monitor)
t2 = threading.Thread(target=adc1.adcreceiver)
t1.start()
t2.start()
input('Hit Enter to terminate ...\n')
except KeyboardInterrupt:
pass
spi.close()
adc1.terminate_event.set()
print('Waiting for threads to finish up ...')
t1.join()
t2.join()