我在使用串行连接将 ESP32 连接到 python 接口时遇到问题。 ESP32 位于通过 Arduino 读取传感器值的系统上。我使用 Python 界面来显示这些读数,并将它们显示为图表。问题是,当我将 ESP32 连接到接口时,我收到此日志错误: utf-8' codec can't Decode bytes inposition 2-3: invalid continuation byte 但是当我尝试使用另一个未链接到的 ESP32 时传感器系统,但有一个 Arduino 代码,可以在所需的时间间隔内生成值,它工作得很好。
有谁知道如何解决这个问题吗?如果需要,我可以分享我的 Arduino 和 Python 代码。
在此之前,我在使用 pyserial 库时遇到了麻烦,必须卸载并安装它才能正常工作。
阿杜诺:
//--------------------------------------------- Librairies ------------------------------------------------------------------//
#include <Wire.h>
#include <Adafruit_ADS1X15.h>
#include "FS.h"
#include "SD.h"
#include "SPI.h"
#include "BluetoothSerial.h"
#include "RTClib.h"
#include <OneWire.h>
#include <DallasTemperature.h>
#define SENSOR_PIN 2
//---------------------------------------------------------------------------------------------------------------------------//
Adafruit_ADS1115 ads; // Adresse I2C de l'ADS1115
File dataFile;
//-------------------------------- Variables pour minuteur d'acquisition ----------------------------------------------------//
// Minuteur, mieux qu'un delay car un delay bloque le code et lui dit de ne rien faire pendant le temps bloqué, pas le cas avec un minuteur
unsigned long Clck_Aqc = millis();
unsigned long delai_ACQ = 1000;
unsigned long Clck_uSD = millis();
unsigned long delai_uSD = 1000 * 5;
//---------------------------------------------------------------------------------------------------------------------------//
//--------------------------------------------- Variables pour ADC ----------------------------------------------------------//
uint32_t adc0_DO = 0;
uint32_t adc1 = 0;
uint32_t adc2_temp =0;
float DO_value = 0;
float pH_value = 0;
//---------------------------------------------------------------------------------------------------------------------------//
//--------------------------------------------- Variabless DS18B20 ----------------------------------------------------------//
float Temperature = 0;
OneWire oneWire(SENSOR_PIN);
DallasTemperature DS18B20(&oneWire);
//---------------------------------------------------------------------------------------------------------------------------//
//--------------------------------------------- Variables pour carte uSD ----------------------------------------------------//
String labelsDonneesSD = "Date,DO,PH,Temperature \r\n";
String donneesSD = "";
int SD_CS = 5;
//---------------------------------------------------------------------------------------------------------------------------//
//---------------------------------------------- Variables pour l'horloge ---------------------------------------------------//
RTC_DS3231 rtc; //module RTC
String Date;
String heure;
//---------------------------------------------------------------------------------------------------------------------------//
void setup() {
Serial.begin(9600);
DS18B20.begin(); // initialize the DS18B20 sensor
Wire.begin();
Start_ADC();
Start_uSD();
initRTC();
}
//---------------------------------------------------------------------------------------------------------------------------//
void loop() {
if (millis() - Clck_Aqc >= delai_ACQ) {
Clck_Aqc = millis();
Lire_ADC();
AfficheValeurs();
}
if (millis() - Clck_uSD >= delai_uSD) {
Clck_uSD = millis();
// Ajout des données au fichier CSV
Enregistrement_SD();
} //if millis
} //void loop
Void AfficheValeurs(){
// Affiche dans le moniteur série
Serial.print("DO : ");
Serial.print(DO_value);
Serial.print(", ");
Serial.print("pH : ");
Serial.print(pH_value);
Serial.print(", ");
Serial.print("Temperature : ");
Serial.print(Temperature);
Serial.print("C");
Serial.print("\n");
} // void AfficheValeurs
Python:
from settings import *
from main_menu import MainMenu
from graphs import Graph4H, Graph24H
from log_menu import LogMenu
#TODO:
"""
Add a settings menu for:
- autosaving capacity (switch)
- Changing the port (combobox)
- changing the baudrate (combobox)
"""
class App(ctk.CTk):
def __init__(self):
super().__init__()
self.geometry("900x600+100+50")
self.title("Bioreacteur")
self.small_size_font = ctk.CTkFont(size=14)
self.medium_size_font = ctk.CTkFont(size=18)
self.large_size_font = ctk.CTkFont(size=24)
self.huge_size_font = ctk.CTkFont(size=36)
self.has_exited = False
self.serial_port = None
self.serial_thread = None
self.last_opened_tab = None
self.port = ctk.StringVar(value="COM4")
self.baudrate = ctk.IntVar(value=9600)
self.do = ctk.DoubleVar()
self.ph = ctk.DoubleVar()
self.temp = ctk.DoubleVar()
self.do.trace_add("write", lambda *args: self.menu_frame.do_label.configure(text=f"DO : {self.do.get()}"))
self.ph.trace_add("write", lambda *args: self.menu_frame.ph_label.configure(text=f"pH : {self.ph.get()}"))
self.temp.trace_add("write", lambda *args: self.menu_frame.temperature_label.configure(text=f"Temperature : {self.temp.get()}"))
self.create_widgets()
# csv_filename = f"data/d-{datetime.now().strftime('%d-%m-%Y')}.csv" if seperated method is wanted
self.day_data = {}
if self.file_check(CSV_FILENAME):
self.day_data = self.load_data("4H")
log_filename = f"logs/log-{datetime.now().strftime('%d-%m-%Y')}.json"
self.logs = []
if self.file_check(log_filename):
logs = self.load_logs(log_filename)
if logs is None:
return
for log in logs:
self.log_errors(log)
def file_check(self, filename: str):
"""
Checks if a file exists and creates it if not.
Args:
filename (str): Path to the file.
"""
if not (return_val := os.path.exists(filename)):
# Create the file with write mode ('w') and newline='' for proper handling
with open(filename, 'x', newline='') as f:
if filename.endswith('.csv'):
writer = csv.writer(f)
writer.writerow(["Day","Timestamp", "DO", "pH", "Temperature"])
return return_val
def create_widgets(self):
def open_tab(frame: ctk.CTkFrame, title: str):
self.title(title)
frame.place(relx=0, rely=0, relwidth=1, relheight=0.9)
frame.extra()
if self.last_opened_tab is not None: frame.lift(self.last_opened_tab)
self.last_opened_tab = frame
# Tab Frames
self.menu_frame = MainMenu(master=self)
self.graph_4h_frame = Graph4H(master=self)
self.graph_24h_frame = Graph24H(master=self)
self.log_errors_frame = LogMenu(master=self)
# Tab switching Buttons
self.menu_btn = ctk.CTkButton(
master=self,
text="Menu",
corner_radius=0,
fg_color=BUTTON_FG_COLOR,
text_color=BUTTON_TEXT_COLOR,
command=lambda: open_tab(self.menu_frame, "Bioreacteur: Menu")
)
self.graph_4h_btn = ctk.CTkButton(
master=self,
text="Graph 4h",
corner_radius=0,
fg_color=BUTTON_FG_COLOR,
text_color=BUTTON_TEXT_COLOR,
command=lambda: open_tab(self.graph_4h_frame, "Bioreacteur: Graph des 4 dernières heures")
)
self.graph_24h_btn = ctk.CTkButton(
master=self,
text="Graph 24h",
corner_radius=0,
fg_color=BUTTON_FG_COLOR,
text_color=BUTTON_TEXT_COLOR,
command=lambda: open_tab(self.graph_24h_frame, "Bioreacteur: Graph des 24 dernières heures")
)
self.log_errors_btn = ctk.CTkButton(
master=self,
text="Log Errors",
corner_radius=0,
fg_color=BUTTON_FG_COLOR,
text_color=BUTTON_TEXT_COLOR,
command=lambda: open_tab(self.log_errors_frame, "Bioreacteur: Log Erreurs")
)
self.menu_btn.place(relx=0, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
self.graph_4h_btn.place(relx=0.25, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
self.graph_24h_btn.place(relx=0.5, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
self.log_errors_btn.place(relx=0.75, rely=1, relwidth=0.25, relheight=0.1, anchor="sw")
open_tab(self.menu_frame, "Bioreacteur: Menu")
def log_errors(self, error_message: Union[Exception, str], custom_error_message: Optional[str] = "Erreur lors de l'analyse des données :"):
print(error_message)
error_message = error_message if isinstance(error_message, str) else f"({datetime.now().strftime('%d/%m/%Y, %H:%M:%S')}): {custom_error_message}\n{str(error_message)}\n"
self.menu_frame.log_error_text.insert(ctk.END, error_message)
self.log_errors_frame.text.insert(ctk.END, error_message)
self.logs.append(error_message)
def connect(self, timeout: int = 1):
self.toggle_enregistrement_auto()
# Cette fonction est appelée lors du clic sur le bouton "Se connecter"
print(self.port.get(), self.baudrate.get())
# Essaye de se connecter à la carte
try:
self.serial_port = serial.Serial(port=self.port.get(), baudrate=self.baudrate.get(), timeout=timeout) # Ouverture du port série
if not self.serial_port.is_open:
self.serial_port.open()
# Si la connexion est réussie, mettre à jour l'interface utilisateur
self.menu_frame.connect_btn.configure(text="Connecté", state=DISABLED)
# Lance la récupération des données
self.start_reading_serial()
except Exception as e:
# En cas d'échec de la connexion, afficher un message d'erreur dans le log
self.log_errors(e)
def start_reading_serial(self):
# Cette fonction démarre le thread de lecture des données série
self.serial_thread = thread.Thread(target=self.read_serial)
self.serial_thread.daemon = True # Le thread se terminera lorsque le programme principal se terminera
self.serial_thread.start()
def enregistrer_csv(self):
# Récupére les valeurs actuelles des labels DO, pH et Température
self.do_value = self.do.get()
self.ph_value = self.ph.get()
self.temperature = self.temp.get()
if (self.do_value, self.ph_value, self.temperature) == (0, 0, 0):
return
# Obtenir la date et l'heure actuelles
current_day, current_time = datetime.now().strftime("%d-%m-%Y, %H:%M:%S").split(", ")
for category, data in (("Timestamp", current_time), ("DO", self.do_value), ("pH", self.ph_value), ("Temperature", self.temperature)):
self.day_data[current_day][category].append(data)
# Écrire les données dans un fichier CSV
with open(f"data/data.csv", mode="a", newline="") as file:
writer = csv.writer(file)
# Écrire les en-têtes de colonnes s'il s'agit d'un nouveau fichier
writer.writerow([current_day, current_time, self.do_value, self.ph_value, self.temperature])
def load_data(self, _type: Literal["4H", "24H"]) -> dict[str, dict[str, list[int]]]:
output_dict = {}
with open(CSV_FILENAME, mode="r") as file:
reader = csv.reader(file)
data = list(reader)
if len(data) == 0:
return output_dict
headers = data[0] # Get the headers from the first row
# Process each data row
if _type == "4H":
date_str = datetime.now().strftime("%d-%m-%Y")
output_dict[date_str] = {header: [] for header in headers[1:]}
day_data = output_dict[date_str]
for row in data[1:]:
if row[0] != date_str:
continue
for i, value in enumerate(row):
if i == 0:
continue
day_data[headers[i]].append(value)
else:
previous_date = data[1][0] #Get the first date of the data
for row_index in range(len(data)):
if row_index == 0:
continue
row = data[row_index] #Get the current row
date_str = row[0] #Get the date at position 0 of the row
if date_str != previous_date: #Check if the new date is different from the last
previous_date = date_str #If the case then sum up all the values of the day before reseting them
for header, values in day_data.items():
if header == "Timestamp":
continue
day_data[header] = round(sum(values) / len(values), 2) # Calculer la moyenne des valeurs
if date_str not in output_dict.keys():
output_dict[date_str] = {header: [] for header in headers[1:]}
day_data = output_dict[date_str]
for i, value in enumerate(row):
if i == 0:
continue
day_data[headers[i]].append(float(value) if len(value) <= 5 else value)
for header, values in day_data.items():
if header == "Timestamp":
continue
day_data[header] = round(sum(values) / len(values), 2) # Calculer la moyenne des valeurs
return output_dict
def load_logs(self, filename: str) -> list[str]:
try:
with open(filename, "r") as f:
data = json.load(f)
return data
except (json.JSONDecodeError, FileNotFoundError) as e:
# Handle the empty file case or other exceptions
self.log_errors(error_message=e, custom_error_message="Loading Error: Could not load logs from file:")
return []
def save_logs(self, filename: str, data: list[str]):
try:
with open(filename, "w") as f:
json.dump(data, f)
except Exception as e:
# Handle the empty file case or other exceptions
self.log_errors(e)
def read_serial(self):
# Cette fonction lit les données série et les affiche dans l'interface graphique
while True:
try:
# Lire les données série
print(self.serial_port.readline())
data = self.serial_port.readline().decode("utf-8").strip()
if data == "":
continue
# Analyser les données
self.analyze_data(data)
except Exception as e:
# Afficher l'erreur dans l'interface graphique
self.log_errors(e)
print(e)
# Arrêter le thread de lecture des données série
# if self.serial_thread: self.serial_thread.join()
self.has_exited = True
# Quitter le programme
exit(1)
def analyze_data(self, data: str):
# Cette fonction analyse les données série et met à jour les labels DO, pH et Température
try:
if data == "":
return
# Séparer les données en trois parties : DO, pH et Température
do, ph, temperature = data.split(",")
do = do.split(":")[1]
ph = ph.split(":")[1]
temperature = temperature.split(":")[1]
temperature = temperature[:-2]
# Convertir les données en nombres
do = round(float(do), 2)
ph = round(float(ph), 2)
temperature = round(float(temperature), 2)
# Mettre à jour les labels DO, pH et Température
self.do.set(do)
self.ph.set(ph)
self.temp.set(temperature)
except Exception as e:
# Afficher l'erreur dans l'interface graphique
self.log_errors(e)
def toggle_enregistrement_manuel(self):
self.enregistrement_auto_active = False
self.menu_frame.save_btn.configure(state="normal")
def toggle_enregistrement_auto(self):
self.enregistrement_auto_active = True
self.menu_frame.save_btn.configure(state="disabled") # Désactiver le bouton pour éviter d'être pressé à nouveau
self.start_auto_enregistrement()
def start_auto_enregistrement(self):
if self.enregistrement_auto_active or not self.has_exited:
self.enregistrer_csv()
self.after(1000, self.start_auto_enregistrement) # Appeler cette méthode toutes les 1000 millisecondes (1 seconde)
def destroy(self):
# if self.serial_thread is not None: self.serial_thread.join(99)
# if self.serial_port is not None: self.serial_port.close()
self.has_exited = True
try:
log_filename = f"logs/log-{datetime.now().strftime('%d-%m-%Y')}.json"
if self.file_check(log_filename): self.save_logs(log_filename, self.logs)
except Exception as e:
self.log_errors(e)
# print(self.csv_load_data(_type="4h"))
super().destroy()
if __name__ == "__main__":
app = App()
app.mainloop()
找到问题的解决方案了! ESP32 EN/RESET 引脚上的 10μF 电容阻碍了 python 接口接收数据,将其取下即可解决问题。尽管如此,还是感谢您的帮助。