我在尝试让项目正常运行时遇到一些问题,希望有人可以提供帮助。对于该项目,我使用 Raspberry Pi 4B 和一个包含传感器并通过 BLE 发送数据的开发套件。
该项目由一个作为边缘设备的 Raspberry Pi 组成,它通过 BLE 读取传感器数据、过滤数据并进行处理。对于处理部分,我想使用两个机器学习模型和可编程逻辑(对数据进行数学计算)处理相同的数据。我想要实现的输出是知道我的开发套件的位置(它位于电梯顶上,因此我可以知道垂直位置以及门的位置)。
当我尝试单独运行处理代码时(一个项目具有可编程逻辑,另一个项目具有两个 ML 模型),它们工作正常,但当我加入它们时,负责可编程逻辑处理的线程无法正常工作。
from bluepy import btle
import threading
import numpy as np
import tensorflow as tf
import time
import variables as var
import functions as func
class MyDelegate(btle.DefaultDelegate):
def __init__(self):
btle.DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
var.acc_y[0] = int.from_bytes(data[4:6],"little",signed=True)
var.acc_x[0] = int.from_bytes(data[2:4],"little",signed=True)
var.mag_x[0] = int.from_bytes(data[14:16],"little",signed=True)/1000#Convert mGAuss to Gauss
var.mag_y[0] = int.from_bytes(data[16:18],"little",signed=True)/1000#Convert mGAuss to Gauss
var.mag_z[0] = int.from_bytes(data[18:20],"little",signed=True)/1000#Convert mGAuss to Gauss
func.Filter_Data()
if var.calculate_average == 1:
func.Calculate_Average()
else:
var.ready_read = True
if var.count_buffer_DOORS < var.WINDOW_STEP_DOORS - 1:
var.count_buffer_DOORS += 1
var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
else:
var.count_buffer_DOORS = 0
var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
var.ready_model_DOORS = 1
if var.count_buffer_CAB < var.WINDOW_STEP_CAB - 1:
var.count_buffer_CAB += 1
var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
else:
var.count_buffer_CAB = 0
var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
var.ready_model_CAB = 1
#Thread where programmed logic calculations run
def programmedLogic_THREAD():
while var.stop_thread == False:
if (var.ready_read == True):
func.Calculate_Desv_Est_Mag()
func.Calculate_Position_Cab()
if var.flag_ready == 1 and var.learn_mag == 1:
func.Learn_Mag()
func.Calculate_Position_Doors()
if var.update_average == 1:
func.Update_Average()
else:
var.count_average = 0
var.sum_acc_z = 0
var.sum_acc_y = 0
var.sum_acc_x = 0
if var.state == 1: var.DB_LP_cab = "Up"
elif var.state == -1: var.DB_LP_cab = "Down"
elif var.state == 0: var.DB_LP_cab = "Still"
if var.move_doors == 1: var.DB_LP_doors = "Opening"
elif var.move_doors == -1: var.DB_LP_doors = "Closing"
elif var.move_doors == 0: var.DB_LP_doors = "Still"
var.ready_read = False
#Thread where ML Doors Model runs and predicts
def doorsML_THREAD():
while var.stop_thread == False:
if (var.ready_model_DOORS == 1) and (var.ML_flag == True):
var.ready_model_DOORS = 0
x = np.expand_dims(var.window_buffer_DOORS, axis = 1)
var.result_DOORS = custom_model_puertas.predict(x.T.tolist(), verbose = 0)
var.doors_State = np.argmax(var.result_DOORS)
var.array_doors_State = var.array_doors_State[1:]+[var.doors_State]
if all(element == var.array_doors_State[0] for element in var.array_doors_State):
var.print_doors_State = var.array_doors_State[0]
if var.print_doors_State == 0:
var.DB_ML_doors = "Opening"
elif var.print_doors_State == 1:
var.DB_ML_doors = "Closing"
elif var.print_doors_State == 2:
var.DB_ML_doors = "Still"
#Thread where ML Cab Model runs and predicts
def cabML_THREAD():
while var.stop_thread == False:
if (var.ready_model_CAB == 1) and (var.ML_flag == True):
var.ready_model_CAB = 0
x = np.expand_dims(var.window_buffer_CAB, axis = 1)
var.result_CAB = custom_model_cabina.predict(x.T.tolist(), verbose = 0)
var.cab_State = np.argmax(var.result_CAB)
var.array_cab_State = var.array_cab_State[1:]+[var.cab_State]
if all(element == var.array_cab_State[0] for element in var.array_cab_State):
var.print_cab_State = var.array_cab_State[0]
if var.print_cab_State == 0:
var.DB_ML_cab = "Down"
elif var.print_cab_State == 1:
var.DB_ML_cab = "Still"
elif var.print_cab_State == 2:
var.DB_ML_cab="Up"
#Thread that prints results in terminal
def print_THREAD():
count = 0
while (var.stop_thread == False):
count+=1
if count>=20:
count=0
var.programmedLogic_flag = not var.programmedLogic_flag
var.ML_flag = not var.ML_flag
if var.programmedLogic_flag == True:
print(f"\n\tLP Data:\nCab state:\tDoors state:\n{var.DB_LP_cab}\t\t{var.DB_LP_doors}")
elif var.ML_flag == True:
print(f"\n\tML Data:\nCab state:\tDoors state:\n{var.DB_ML_cab}\t\t{var.DB_ML_doors}")
time.sleep(0.5)
#Create threads
thread1 = threading.Thread(target=programmedLogic_THREAD)
thread2 = threading.Thread(target=doorsML_THREAD)
thread3 = threading.Thread(target=cabML_THREAD)
thread4 = threading.Thread(target=print_THREAD)
#Upload ML Models
print("Uploading ML Models...")
custom_model_puertas = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/doors_MODEL.h5')
custom_model_cabina = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/cab_MODEL.h5')
print("Successfully uploaded")
#Connect to BLE device
print("Connecting ...")
p = btle.Peripheral(var.MAC_ADDR,addrType=btle.ADDR_TYPE_RANDOM)
p.setDelegate( MyDelegate() )
print("Connected to DA:BB:C5:28:12:A7")
# Setup to turn notifications on
svc = p.getServiceByUUID("00000000-0001-11e1-9ab4-0002a5d5c51b")
ch = svc.getCharacteristics("00e00000-0001-11e1-ac36-0002a5d5c51b")[0]
p.writeCharacteristic(ch.valHandle+1, b"\x01\x00")
#Initialize threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()
while True:
try:
if p.waitForNotifications(1.0):
continue
except KeyboardInterrupt:
p.disconnect()
print("\nDevice disconnected")
var.stop_thread = True
thread1.join()
thread2.join()
thread3.join()
thread4.join()
break
在我添加的项目中,我使用两个文件(变量和函数),其中定义了所有变量和函数。我使用了 4 个线程,这样我就可以同时运行多个任务。另外,我通过通知读取BLE数据,所以我有中断功能。
当我初始化所有线程时,负责可编程逻辑的线程不起作用(它执行错误的计算),但是当我不初始化线程 2 和 3(负责 ML 预测)时,可编程逻辑逻辑线程工作得很好(所以我可以排除我用来计算位置的函数是错误的)。
我在下面附上命令的屏幕截图htop:
希望任何人都能看到我在哪里犯了错误,或者哪里缺少了什么......我很困惑,不知道如何继续前进。
从任何线程访问全局变量都是潜在的竞争条件,这就是全局变量是邪恶的原因。
import variables as var
var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
# multiple other accesses to this variable..
这个
var
遍布整个代码,对它的任何访问都是竞争条件。
完全删除这个
var
。
用多个全局队列代替它
from queue Queue
from_thread1_to_thread2 = Queue()
from_thread1_to_thread3 = Queue()
# etc ...
当一个线程想要向另一个线程发送信息时,它只会将新的信息放在该队列上,而另一个线程将定期从该队列读取数据,因为队列是线程安全的。
有关该主题的更多信息,您应该阅读有关生产者-消费者模式的内容。