Python 中的多线程无法在 Raspberry Pi 上正常工作

问题描述 投票:0回答:1

我在尝试让项目正常运行时遇到一些问题,希望有人可以提供帮助。对于该项目,我使用 Raspberry Pi 4B 和一个包含传感器并通过 BLE 发送数据的开发套件。

该项目由一个作为边缘设备的 Raspberry Pi 组成,它通过 BLE 读取传感器数据、过滤数据并进行处理。对于处理部分,我想使用两个机器学习模型和可编程逻辑(对数据进行数学计算)处理相同的数据。我想要实现的输出是知道我的开发套件的位置(它位于电梯顶上,因此我可以知道垂直位置以及门的位置)。

当我尝试单独运行处理代码时(一个项目具有可编程逻辑,另一个项目具有两个 ML 模型),它们工作正常,但当我加入它们时,负责可编程逻辑处理的线程无法正常工作。

from bluepy import btle
import threading
import numpy as np
import tensorflow as tf
import time
import variables as var
import functions as func

class MyDelegate(btle.DefaultDelegate):
    def __init__(self):
        btle.DefaultDelegate.__init__(self)

    def handleNotification(self, cHandle, data):
        var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
        var.acc_y[0] = int.from_bytes(data[4:6],"little",signed=True)
        var.acc_x[0] = int.from_bytes(data[2:4],"little",signed=True)
        var.mag_x[0] = int.from_bytes(data[14:16],"little",signed=True)/1000#Convert mGAuss to Gauss
        var.mag_y[0] = int.from_bytes(data[16:18],"little",signed=True)/1000#Convert mGAuss to Gauss
        var.mag_z[0] = int.from_bytes(data[18:20],"little",signed=True)/1000#Convert mGAuss to Gauss

        func.Filter_Data()            
        if var.calculate_average == 1:
            func.Calculate_Average()
        else:
            var.ready_read = True
            if var.count_buffer_DOORS < var.WINDOW_STEP_DOORS - 1:
                var.count_buffer_DOORS += 1
                var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
            else: 
                var.count_buffer_DOORS = 0
                var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
                var.ready_model_DOORS = 1
            
            if var.count_buffer_CAB < var.WINDOW_STEP_CAB - 1: 
                var.count_buffer_CAB += 1
                var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
            else: 
                var.count_buffer_CAB = 0
                var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
                var.ready_model_CAB = 1

#Thread where programmed logic calculations run 
def programmedLogic_THREAD():
    while var.stop_thread == False: 
        if (var.ready_read == True):            
            func.Calculate_Desv_Est_Mag()
            func.Calculate_Position_Cab()
            if var.flag_ready == 1 and var.learn_mag == 1: 
                func.Learn_Mag()                
            func.Calculate_Position_Doors()
            if var.update_average == 1:
                func.Update_Average()
            else: 
                var.count_average = 0
                var.sum_acc_z = 0
                var.sum_acc_y = 0
                var.sum_acc_x = 0
            if var.state == 1: var.DB_LP_cab = "Up"
            elif var.state == -1: var.DB_LP_cab = "Down"
            elif var.state == 0: var.DB_LP_cab = "Still"

            if var.move_doors == 1: var.DB_LP_doors = "Opening"
            elif var.move_doors == -1: var.DB_LP_doors = "Closing"
            elif var.move_doors == 0: var.DB_LP_doors = "Still"
            var.ready_read = False
        
#Thread where ML Doors Model runs and predicts
def doorsML_THREAD():
    while var.stop_thread == False: 
        if (var.ready_model_DOORS == 1) and (var.ML_flag == True): 
            var.ready_model_DOORS = 0
            x = np.expand_dims(var.window_buffer_DOORS, axis = 1)
            var.result_DOORS = custom_model_puertas.predict(x.T.tolist(), verbose = 0)
            var.doors_State = np.argmax(var.result_DOORS)          
            var.array_doors_State = var.array_doors_State[1:]+[var.doors_State]
            if all(element == var.array_doors_State[0] for element in var.array_doors_State):
                var.print_doors_State = var.array_doors_State[0]
                if var.print_doors_State == 0: 
                    var.DB_ML_doors = "Opening"
                elif var.print_doors_State == 1: 
                    var.DB_ML_doors = "Closing"
                elif var.print_doors_State == 2: 
                    var.DB_ML_doors = "Still"

#Thread where ML Cab Model runs and predicts
def cabML_THREAD():
    while var.stop_thread == False: 
        if (var.ready_model_CAB == 1) and (var.ML_flag == True): 
            var.ready_model_CAB = 0
            x = np.expand_dims(var.window_buffer_CAB, axis = 1)
            var.result_CAB = custom_model_cabina.predict(x.T.tolist(), verbose = 0)
            var.cab_State = np.argmax(var.result_CAB)
            var.array_cab_State = var.array_cab_State[1:]+[var.cab_State]
            if all(element == var.array_cab_State[0] for element in var.array_cab_State):
                var.print_cab_State = var.array_cab_State[0]
                if var.print_cab_State == 0: 
                    var.DB_ML_cab = "Down"
                elif var.print_cab_State == 1: 
                    var.DB_ML_cab = "Still"
                elif var.print_cab_State == 2: 
                   var.DB_ML_cab="Up"

#Thread that prints results in terminal
def print_THREAD():
    count = 0
    while (var.stop_thread == False):
        count+=1
        if count>=20: 
            count=0
            var.programmedLogic_flag = not var.programmedLogic_flag
            var.ML_flag = not var.ML_flag
        if var.programmedLogic_flag == True:         
            print(f"\n\tLP Data:\nCab state:\tDoors state:\n{var.DB_LP_cab}\t\t{var.DB_LP_doors}") 
        elif var.ML_flag == True:
            print(f"\n\tML Data:\nCab state:\tDoors state:\n{var.DB_ML_cab}\t\t{var.DB_ML_doors}")
        time.sleep(0.5)

#Create threads
thread1 = threading.Thread(target=programmedLogic_THREAD)
thread2 = threading.Thread(target=doorsML_THREAD)
thread3 = threading.Thread(target=cabML_THREAD)
thread4 = threading.Thread(target=print_THREAD)

#Upload ML Models
print("Uploading ML Models...")
custom_model_puertas = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/doors_MODEL.h5')
custom_model_cabina = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/cab_MODEL.h5')
print("Successfully uploaded")

#Connect to BLE device
print("Connecting ...")
p = btle.Peripheral(var.MAC_ADDR,addrType=btle.ADDR_TYPE_RANDOM)
p.setDelegate( MyDelegate() )
print("Connected to DA:BB:C5:28:12:A7")

# Setup to turn notifications on
svc = p.getServiceByUUID("00000000-0001-11e1-9ab4-0002a5d5c51b")
ch = svc.getCharacteristics("00e00000-0001-11e1-ac36-0002a5d5c51b")[0]

p.writeCharacteristic(ch.valHandle+1, b"\x01\x00")

#Initialize threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()

while True:
    try:
        if p.waitForNotifications(1.0):
            continue
        
    except KeyboardInterrupt:
        p.disconnect()
        print("\nDevice disconnected")
        var.stop_thread = True
        thread1.join()
        thread2.join()
        thread3.join()        
        thread4.join()
        break         


在我添加的项目中,我使用两个文件(变量和函数),其中定义了所有变量和函数。我使用了 4 个线程,这样我就可以同时运行多个任务。另外,我通过通知读取BLE数据,所以我有中断功能。

当我初始化所有线程时,负责可编程逻辑的线程不起作用(它执行错误的计算),但是当我不初始化线程 2 和 3(负责 ML 预测)时,可编程逻辑逻辑线程工作得很好(所以我可以排除我用来计算位置的函数是错误的)。

我在下面附上命令的屏幕截图htop

htop 命令截图

希望任何人都能看到我在哪里犯了错误,或者哪里缺少了什么......我很困惑,不知道如何继续前进。

machine-learning python-multithreading iot raspberry-pi4
1个回答
0
投票

从任何线程访问全局变量都是潜在的竞争条件,这就是全局变量是邪恶的原因。

import variables as var

var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
# multiple other accesses to this variable..

这个

var
遍布整个代码,对它的任何访问都是竞争条件。

完全删除这个

var
。 用多个全局队列代替它

from queue Queue

from_thread1_to_thread2 = Queue()
from_thread1_to_thread3 = Queue()
# etc ...

当一个线程想要向另一个线程发送信息时,它只会将新的信息放在该队列上,而另一个线程将定期从该队列读取数据,因为队列是线程安全的。

有关该主题的更多信息,您应该阅读有关生产者-消费者模式的内容。

© www.soinside.com 2019 - 2024. All rights reserved.