通过串口连接ESP32到Python接口

问题描述 投票:0回答:1

我在使用串行连接将 ESP32 连接到 python 接口时遇到问题。 ESP32 位于通过 Arduino 读取传感器值的系统上。我使用 Python 界面来显示这些读数,并将它们显示为图表。问题是,当我将 ESP32 连接到接口时,我收到此日志错误: utf-8' codec can't Decode bytes inposition 2-3: invalid continuation byte 但是当我尝试使用另一个未链接到的 ESP32 时传感器系统,但有一个 Arduino 代码,可以在所需的时间间隔内生成值,它工作得很好。

有谁知道如何解决这个问题吗?如果需要,我可以分享我的 Arduino 和 Python 代码。

在此之前,我在使用 pyserial 库时遇到了麻烦,必须卸载并安装它才能正常工作。

阿杜诺:

//--------------------------------------------- Librairies ------------------------------------------------------------------//
#include <Wire.h>
#include <Adafruit_ADS1X15.h>
#include "FS.h"
#include "SD.h"
#include "SPI.h"
#include "BluetoothSerial.h"
#include "RTClib.h"
#include <OneWire.h>
#include <DallasTemperature.h>

#define SENSOR_PIN  2
//---------------------------------------------------------------------------------------------------------------------------//
Adafruit_ADS1115 ads; // Adresse I2C de l'ADS1115
File dataFile;
//-------------------------------- Variables pour minuteur d'acquisition ----------------------------------------------------//
// Minuteur, mieux qu'un delay car un delay bloque le code et lui dit de ne rien faire pendant le temps bloqué, pas le cas avec un minuteur
unsigned long Clck_Aqc = millis();
unsigned long delai_ACQ = 1000;

unsigned long Clck_uSD = millis();
unsigned long delai_uSD = 1000 * 5;
//---------------------------------------------------------------------------------------------------------------------------//

//--------------------------------------------- Variables pour ADC ----------------------------------------------------------//
uint32_t adc0_DO = 0;
uint32_t adc1 = 0;
uint32_t adc2_temp =0;
float DO_value = 0;
float pH_value = 0;
//---------------------------------------------------------------------------------------------------------------------------//

//--------------------------------------------- Variabless DS18B20 ----------------------------------------------------------//
float Temperature = 0;
OneWire oneWire(SENSOR_PIN);
DallasTemperature DS18B20(&oneWire);
//---------------------------------------------------------------------------------------------------------------------------//

//--------------------------------------------- Variables pour carte uSD ----------------------------------------------------//
String labelsDonneesSD = "Date,DO,PH,Temperature \r\n";
String donneesSD = "";
int SD_CS = 5;
//---------------------------------------------------------------------------------------------------------------------------//

//---------------------------------------------- Variables pour l'horloge ---------------------------------------------------//
RTC_DS3231 rtc; //module RTC
String Date;
String heure;
//---------------------------------------------------------------------------------------------------------------------------//
void setup() {
  Serial.begin(9600);
  DS18B20.begin();    // initialize the DS18B20 sensor
  Wire.begin();
  Start_ADC();
  Start_uSD();
  initRTC();
}
   //---------------------------------------------------------------------------------------------------------------------------//
    
void loop() {
  if (millis() - Clck_Aqc >= delai_ACQ) {
    Clck_Aqc = millis();
    Lire_ADC();
    AfficheValeurs();                          
  }
    if (millis() - Clck_uSD >= delai_uSD) {
    Clck_uSD = millis();
    // Ajout des données au fichier CSV
    Enregistrement_SD();                             
  } //if millis
} //void loop

Void AfficheValeurs(){
 // Affiche dans le moniteur série
  Serial.print("DO : ");
  Serial.print(DO_value);
  Serial.print(", ");
  Serial.print("pH : ");
  Serial.print(pH_value);
  Serial.print(", ");
  Serial.print("Temperature : ");
  Serial.print(Temperature);
  Serial.print("C");
  Serial.print("\n");
} // void AfficheValeurs

Python:

from settings import *
from main_menu import MainMenu
from graphs import Graph4H, Graph24H
from log_menu import LogMenu

#TODO:
"""
Add a settings menu for:
 - autosaving capacity (switch)
 - Changing the port (combobox)
 - changing the baudrate (combobox)
"""


class App(ctk.CTk):
    def __init__(self):
        super().__init__()
        self.geometry("900x600+100+50")
        self.title("Bioreacteur")

        self.small_size_font = ctk.CTkFont(size=14)
        self.medium_size_font = ctk.CTkFont(size=18)
        self.large_size_font = ctk.CTkFont(size=24)
        self.huge_size_font = ctk.CTkFont(size=36)

        self.has_exited = False
        self.serial_port = None 
        self.serial_thread = None

        self.last_opened_tab = None
                
        self.port = ctk.StringVar(value="COM4")
        self.baudrate = ctk.IntVar(value=9600)
        
        self.do = ctk.DoubleVar()
        self.ph = ctk.DoubleVar()
        self.temp = ctk.DoubleVar()
        
        self.do.trace_add("write", lambda *args: self.menu_frame.do_label.configure(text=f"DO : {self.do.get()}"))
        self.ph.trace_add("write", lambda *args: self.menu_frame.ph_label.configure(text=f"pH : {self.ph.get()}"))
        self.temp.trace_add("write", lambda *args: self.menu_frame.temperature_label.configure(text=f"Temperature : {self.temp.get()}"))

        self.create_widgets()
        
        # csv_filename = f"data/d-{datetime.now().strftime('%d-%m-%Y')}.csv" if seperated method is wanted
        self.day_data = {}
        if self.file_check(CSV_FILENAME):
            self.day_data = self.load_data("4H")
            
        log_filename = f"logs/log-{datetime.now().strftime('%d-%m-%Y')}.json"
        self.logs = []
        if self.file_check(log_filename):
            logs = self.load_logs(log_filename)
            if logs is None:
                return 
            
            for log in logs:
                self.log_errors(log)

    def file_check(self, filename: str):
        """
        Checks if a file exists and creates it if not.

        Args:
            filename (str): Path to the file.
        """
        if not (return_val := os.path.exists(filename)):
            # Create the file with write mode ('w') and newline='' for proper handling
            with open(filename, 'x', newline='') as f:
                if filename.endswith('.csv'):
                    writer = csv.writer(f)
                    writer.writerow(["Day","Timestamp", "DO", "pH", "Temperature"])
        
        return return_val

    def create_widgets(self):
        def open_tab(frame: ctk.CTkFrame, title: str):
            self.title(title)
            frame.place(relx=0, rely=0, relwidth=1, relheight=0.9)
            frame.extra()
            if self.last_opened_tab is not None: frame.lift(self.last_opened_tab)
            self.last_opened_tab = frame

        # Tab Frames
        self.menu_frame = MainMenu(master=self)
        self.graph_4h_frame = Graph4H(master=self)
        self.graph_24h_frame = Graph24H(master=self)
        self.log_errors_frame = LogMenu(master=self)

        # Tab switching Buttons
        self.menu_btn = ctk.CTkButton(
            master=self, 
            text="Menu", 
            corner_radius=0, 
            fg_color=BUTTON_FG_COLOR,
            text_color=BUTTON_TEXT_COLOR,
            command=lambda: open_tab(self.menu_frame, "Bioreacteur: Menu")
        )
        self.graph_4h_btn = ctk.CTkButton(
            master=self, 
            text="Graph 4h", 
            corner_radius=0, 
            fg_color=BUTTON_FG_COLOR,
            text_color=BUTTON_TEXT_COLOR,
            command=lambda: open_tab(self.graph_4h_frame, "Bioreacteur: Graph des 4 dernières heures")
        )
        self.graph_24h_btn = ctk.CTkButton(
            master=self, 
            text="Graph 24h", 
            corner_radius=0, 
            fg_color=BUTTON_FG_COLOR,
            text_color=BUTTON_TEXT_COLOR,
            command=lambda: open_tab(self.graph_24h_frame, "Bioreacteur: Graph des 24 dernières heures")
        )
        self.log_errors_btn = ctk.CTkButton(
            master=self, 
            text="Log Errors", 
            corner_radius=0, 
            fg_color=BUTTON_FG_COLOR,
            text_color=BUTTON_TEXT_COLOR,
            command=lambda: open_tab(self.log_errors_frame, "Bioreacteur: Log Erreurs")
        )

        self.menu_btn.place(relx=0, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
        self.graph_4h_btn.place(relx=0.25, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
        self.graph_24h_btn.place(relx=0.5, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
        self.log_errors_btn.place(relx=0.75, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
        
        open_tab(self.menu_frame, "Bioreacteur: Menu")

    def log_errors(self, error_message: Union[Exception, str], custom_error_message: Optional[str] = "Erreur lors de l'analyse des données :"):
        print(error_message)
        error_message = error_message if isinstance(error_message, str) else f"({datetime.now().strftime('%d/%m/%Y, %H:%M:%S')}): {custom_error_message}\n{str(error_message)}\n"
        self.menu_frame.log_error_text.insert(ctk.END, error_message)
        self.log_errors_frame.text.insert(ctk.END, error_message)
        self.logs.append(error_message)

    def connect(self, timeout: int = 1):
        self.toggle_enregistrement_auto()
    
        # Cette fonction est appelée lors du clic sur le bouton "Se connecter"
        print(self.port.get(), self.baudrate.get())
        # Essaye de se connecter à la carte
        try:
            self.serial_port = serial.Serial(port=self.port.get(), baudrate=self.baudrate.get(), timeout=timeout)  # Ouverture du port série
            if not self.serial_port.is_open:
                self.serial_port.open()
            # Si la connexion est réussie, mettre à jour l'interface utilisateur
            self.menu_frame.connect_btn.configure(text="Connecté", state=DISABLED)
            # Lance la récupération des données
            self.start_reading_serial()
        except Exception as e:
            # En cas d'échec de la connexion, afficher un message d'erreur dans le log
            self.log_errors(e)

    def start_reading_serial(self):
        # Cette fonction démarre le thread de lecture des données série
        self.serial_thread = thread.Thread(target=self.read_serial)
        self.serial_thread.daemon = True  # Le thread se terminera lorsque le programme principal se terminera
        self.serial_thread.start()

    def enregistrer_csv(self):
        # Récupére les valeurs actuelles des labels DO, pH et Température
        self.do_value = self.do.get()
        self.ph_value = self.ph.get()
        self.temperature = self.temp.get()

        if (self.do_value, self.ph_value, self.temperature) == (0, 0, 0):
            return

        # Obtenir la date et l'heure actuelles
        current_day, current_time = datetime.now().strftime("%d-%m-%Y, %H:%M:%S").split(", ")

        for category, data in (("Timestamp", current_time), ("DO", self.do_value), ("pH", self.ph_value), ("Temperature", self.temperature)):
            self.day_data[current_day][category].append(data)

        # Écrire les données dans un fichier CSV
        with open(f"data/data.csv", mode="a", newline="") as file:
            writer = csv.writer(file)
            # Écrire les en-têtes de colonnes s'il s'agit d'un nouveau fichier
            writer.writerow([current_day, current_time, self.do_value, self.ph_value, self.temperature])

    def load_data(self, _type: Literal["4H", "24H"]) -> dict[str, dict[str, list[int]]]:
        output_dict = {}
                
        with open(CSV_FILENAME, mode="r") as file:
            reader = csv.reader(file)
            
            data = list(reader)
            if len(data) == 0:
                return output_dict
            
            headers = data[0]  # Get the headers from the first row

            # Process each data row
            if _type == "4H":
                date_str = datetime.now().strftime("%d-%m-%Y")
                
                output_dict[date_str] = {header: [] for header in headers[1:]}
                day_data = output_dict[date_str]
                
                for row in data[1:]:
                    if row[0] != date_str:
                        continue                        
                        
                    for i, value in enumerate(row):
                        if i == 0:
                            continue
                        
                        day_data[headers[i]].append(value)
                    
            else:
                previous_date = data[1][0] #Get the first date of the data
                for row_index in range(len(data)):
                    if row_index == 0:
                        continue
                    
                    row = data[row_index]  #Get the current row

                    date_str = row[0] #Get the date at position 0 of the row
                    
                    if date_str != previous_date: #Check if the new date is different from the last
                        previous_date = date_str #If the case then sum up all the values of the day before reseting them

                        for header, values in day_data.items():
                            if header == "Timestamp":
                                continue

                            day_data[header] = round(sum(values) / len(values), 2)  # Calculer la moyenne des valeurs
                    
                    if date_str not in output_dict.keys():
                        output_dict[date_str] = {header: [] for header in headers[1:]}
                        
                    day_data = output_dict[date_str]
                                        
                    for i, value in enumerate(row):
                        if i == 0:
                            continue
                        
                        day_data[headers[i]].append(float(value) if len(value) <= 5 else value)
                
                for header, values in day_data.items():
                    if header == "Timestamp":
                        continue

                    day_data[header] = round(sum(values) / len(values), 2)  # Calculer la moyenne des valeurs
            
        return output_dict

    def load_logs(self, filename: str) -> list[str]:                
        try:
            with open(filename, "r") as f:
                data = json.load(f)
            return data
        except (json.JSONDecodeError, FileNotFoundError) as e:
            # Handle the empty file case or other exceptions
            self.log_errors(error_message=e, custom_error_message="Loading Error: Could not load logs from file:")
            return []
            
        
    def save_logs(self, filename: str, data: list[str]):                
        try:
            with open(filename, "w") as f:
                json.dump(data, f)
        except Exception as e:
            # Handle the empty file case or other exceptions
            self.log_errors(e)                
        

    def read_serial(self):
        # Cette fonction lit les données série et les affiche dans l'interface graphique
        while True:
            try:
                # Lire les données série
                print(self.serial_port.readline())
                data = self.serial_port.readline().decode("utf-8").strip()
                if data == "":
                    continue
                # Analyser les données
                self.analyze_data(data)
            except Exception as e:
                # Afficher l'erreur dans l'interface graphique
                self.log_errors(e)
                print(e)
                
                # Arrêter le thread de lecture des données série
                # if self.serial_thread: self.serial_thread.join()
                self.has_exited = True

                # Quitter le programme
                exit(1)

    def analyze_data(self, data: str):
        # Cette fonction analyse les données série et met à jour les labels DO, pH et Température
        try:
            if data == "":
                return
            
            # Séparer les données en trois parties : DO, pH et Température
            do, ph, temperature = data.split(",")
            do = do.split(":")[1]
            ph = ph.split(":")[1]
            temperature = temperature.split(":")[1]
            temperature = temperature[:-2]
            
            # Convertir les données en nombres
            do = round(float(do), 2)
            ph = round(float(ph), 2)
            temperature = round(float(temperature), 2)

            # Mettre à jour les labels DO, pH et Température
            self.do.set(do)
            self.ph.set(ph)
            self.temp.set(temperature)
        except Exception as e:
            # Afficher l'erreur dans l'interface graphique
            self.log_errors(e)

    def toggle_enregistrement_manuel(self):
        self.enregistrement_auto_active = False
        self.menu_frame.save_btn.configure(state="normal")
        
    def toggle_enregistrement_auto(self):
            self.enregistrement_auto_active = True
            self.menu_frame.save_btn.configure(state="disabled")  # Désactiver le bouton pour éviter d'être pressé à nouveau
            self.start_auto_enregistrement()
    
    def start_auto_enregistrement(self):
        if self.enregistrement_auto_active or not self.has_exited:
            self.enregistrer_csv()
            self.after(1000, self.start_auto_enregistrement)  # Appeler cette méthode toutes les 1000 millisecondes (1 seconde)

    def destroy(self):
        # if self.serial_thread is not None: self.serial_thread.join(99)
        # if self.serial_port is not None: self.serial_port.close()
        self.has_exited = True

        try:
            log_filename = f"logs/log-{datetime.now().strftime('%d-%m-%Y')}.json"
            if self.file_check(log_filename): self.save_logs(log_filename, self.logs)
        except Exception as e:
            self.log_errors(e)
   
        # print(self.csv_load_data(_type="4h"))

        super().destroy()

if __name__ == "__main__":
    app = App()
    app.mainloop()
python interface arduino esp32 sensors
1个回答
0
投票

找到问题的解决方案了! ESP32 EN/RESET 引脚上的 10μF 电容阻碍了 python 接口接收数据,将其取下即可解决问题。尽管如此,还是感谢您的帮助。

© www.soinside.com 2019 - 2024. All rights reserved.