我使用了 Raspberry Pi 并在 Thonny IDE 中执行了我的 Python 脚本。该脚本包含用于从 UWB ESP32 设备接收数据的套接字模块,这些设备可使用 Arduino 进行编程。
使用当前的 OOP python 代码,我可以观察到,通过在“self.sock = socket.socket(socket .AF_INET,套接字.SOCK_STREAM)”。
已添加行
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
这是该代码的完整代码。另外,我不太确定我是否正确实现了套接字连接的关闭。但目前正在工作。
OOP Python 代码
import socket
import json
import cmath
import os
import time
class UWB:
def __init__(self):
self.connect_network()
self.line = []
def connect_network(self):
self.interface = 'wlan0' # Raspberry Pi Network Interface name
# Get the IP address for the specified interface
try:
self.UDP_IP = os.popen(f"ifconfig {self.interface} | grep 'inet ' | awk '{{print $2}}'").read().strip()
print("***Local IP:" + str(self.UDP_IP) + "*** \n")
except Exception as e:
print("Error at IP: {e}")
self.UDP_PORT = 8080 # Non-priveleged
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.UDP_IP, self.UDP_PORT))
self.sock.listen(1)
print("[---STATUS---] Waiting to secure connection... \n")
self.data, self.addr = self.sock.accept() #
print("[---STATUS---] Accepted Connection from Foreign IP: " + str(self.addr))
# Read UWB data from the network and return a list of UWB data
def read_data(self):
self.line = self.data.recv(1024).decode('UTF-8') # Read data from the network socket
self.uwb_list = []
try:
self.uwb_data = json.loads(self.line) # Parse the received JSON data
self.uwb_list = self.uwb_data["links"] # Extract the "links" field from the JSON data
except:
pass
return self.uwb_list
def start_uwbProcess(self):
while True:
self.line = self.read_data()
print("line", self.line)
time.sleep(0.05)
def get_uwbData(self):
return self.line
def uwbClose(self):
print("Sock Closed")
self.sock.close()
# Initialization
system = UWB()
try:
system.start_uwbProcess()
except OSError as e:
print("Error: {e}")
system.uwbClose()
except KeyboardInterrupt:
print("1) System stopped!..")
system.uwbClose()
finally:
print("2) System stopped!..")
system.uwbClose()
这样,我尝试将 UWB 类的流程和数据调用到 ROBOTCONTROLLER 类。我为此使用了带有 Process 和 Queue 的多处理模块。
但是,现在我再次遇到该错误“OSError:[Errno 98]地址已在使用中”。首先,它会连接并且数据传输正常,就像我可以使用 OOP Python 代码生成的那样,但是在停止并再次运行程序后,我无法再次连接到相同的端口地址。
这是我当前的多处理代码。
多处理
import socket
import json
import cmath
import os
import time
class UWB:
def __init__(self, result_queue):
self.result_queue = result_queue
self.connect_network()
self.list = []
def connect_network(self):
self.interface = 'wlan0' # Raspberry Pi Network Interface name
# Get the IP address for the specified interface
try:
self.UDP_IP = os.popen(f"ifconfig {self.interface} | grep 'inet ' | awk '{{print $2}}'").read().strip()
print("***Local IP:" + str(self.UDP_IP) + "*** \n")
except Exception as e:
print("Error at IP: {e}")
self.UDP_PORT = 8080 # Non-priveleged
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.UDP_IP, self.UDP_PORT))
self.sock.listen(1) # No. of Connected Device
print("[---STATUS---] Waiting to secure connection... \n")
self.data, self.addr = self.sock.accept() #
print("[---STATUS---] Accepted Connection from Foreign IP: " + str(self.addr))
# Read UWB data from the network and return a list of UWB data
def read_data(self):
self.line = self.data.recv(1024).decode('UTF-8') # Read data from the network socket
self.uwb_list = []
try:
self.uwb_data = json.loads(self.line) # Parse the received JSON data
self.uwb_list = self.uwb_data["links"] # Extract the "links" field from the JSON data
except:
pass
return self.uwb_list
def start_uwbProcess(self):
while True:
self.list = self.read_data()
get_uwbData = self.list
self.result_queue.put(get_uwbData)
time.sleep(0.05)
def uwbClose(self):
print("Sock Closed")
self.sock.close()
import RPi.GPIO as GPIO
import math
import multiprocessing
import queue
import turtle
import sys
import time
class ROBOTCONTROLLER:
def __init__(self):
self.initialize_pushbuttonMode()
self.initialize_interfaces()
# Initializing the debouncing variables
# For Start
self.startLastState = True
self.startLastTime = 0
# For Resume
self.resumeLastState = True
self.resumeLastTime = 0
# For Debounce Delay in [BUTTONS]
self.debounceDelay = 50 # Can adjust the debounce delay as needed (in milliseconds)
self.processState = False
self.onGoing = False
self.waitCnt = 0
#self.isDeliveryComplete = False # Will decide if will removed, due to isTargetAchived in UWBdata
# Not used Yet
self.prev_lidar_cycle = 0
self.prev_uwb_cycle = 0
######
self.cmd = None
self.lastcmd = None
def initialize_pushbuttonMode(self):
# Initialize PushButton Mode
print("Setting Up Pushbutton Mode")
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
def initialize_interfaces(self):
self.uwb_result_queue = multiprocessing.Queue()
self.uwbMultiProcess = multiprocessing.Process(target=self.uwbProcessFunction, args=(self.uwb_result_queue,))
self.startMultiProcessing = False
def uwbProcessFunction(self, result_queue):
self.uwb = UWB(result_queue)
self.uwb.start_uwbProcess()
def pushbutton_init(self):
# Define Button pin setting:
GPIO.setup(5, GPIO.IN) # START BUTTON: EXTERNAL-PULLUP
GPIO.setup(6, GPIO.IN) # RESUME BUTTON: EXTERNAL-PULLUP
def run(self):
try:
# Activating System's Process
while True:
if not self.processState:
# Readings of Start Button
curTime = int(time.time() * 1000) # Convert current time to milliseconds
startCurState = GPIO.input(5) # Read the current state of the start button
print("CurStart: " + str(startCurState))
print("Process: " + str(self.processState)) # Already Initialized in function "def initialize_variables(self)"
if startCurState != self.startLastState:
self.startLastTime = curTime
if (curTime - self.startLastTime) > self.debounceDelay:
if startCurState == False: #ON
print("START BUTTON is PRESSED")
self.processState = True #ON
print("PROCESSING...")
else: #==True OFF
print("PRESS the START BUTTON")
self.startLastState = startCurState
else: #self.processState == True:
if self.onGoing:
self.lidarUWBMultiProcessingCallback()
#self.control_center() -> Moved inside lidarUWBMultiProcessingCallback()
else:
curTime = int(time.time()*1000)
resumeCurState = GPIO.input(6)
print("CurResume: ", resumeCurState)
if resumeCurState != self.resumeLastState:
self.resumeLastTime = curTime
if (curTime - self.resumeLastTime) > self.debounceDelay:
if resumeCurState == False: # ON
print("RESUME BUTTON is PRESSED")
self.onGoing = True
else:
print("PRESS the RESUME BUTTON")
self.resumeLastState = resumeCurState
time.sleep(0.05) #Added for incrementing of cycle counts atomically
except Exception as e:
print(f"An error occured: {e}")
def lidarUWBMultiProcessingCallback(self):
if not self.startMultiProcessing:
self.uwbMultiProcess.start()
self.startMultiProcessing = True
print("LidarUWBMultiProcessing started.")
else:
#print("LidarUWBThread is already running")
pass
self.control_center()
def control_center(self):
if not self.uwb_result_queue.empty():
uwbData = self.uwb_result_queue.get_nowait()
print(uwbData)
if __name__ == '__main__':
# Initialization
robot = ROBOTCONTROLLER()
robot.pushbutton_init()
# Activating System's Process
robot.run()
我尝试再次运行以再次运行 OOP Python 代码,但如果我尝试首先使用多处理代码运行代码,OOP python 代码也不会连接,但反之亦然,当首先运行 OOP python 代码时,它会连接在运行多处理代码之前顺利进行。
当错误显示时,我唯一的解决方案是重新启动我的树莓派,这是非常耗时的。这是下面的回溯。
多处理回溯
RESUME BUTTON is PRESSED
LidarUWBMultiProcessing started.
***Local IP:192.168._.___***
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
self.run()
File "/usr/lib/python3.11/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/rome/Desktop/MP_UWB_INPROGRESS2.py", line 105, in uwbProcessFunction
self.uwb = UWB(result_queue)
^^^^^^^^^^^^^^^^^
File "/home/rome/Desktop/MP_UWB_INPROGRESS2.py", line 11, in __init__
self.connect_network()
File "/home/rome/Desktop/MP_UWB_INPROGRESS2.py", line 26, in connect_network
self.sock.bind((self.UDP_IP, self.UDP_PORT))
OSError: [Errno 98] Address already in use
希望任何可以帮助我解决这个问题的回复。谢谢。
最后,在 RPI 终端中使用此命令解决了问题
fuser -k 8080/tcp
并使用 subprocess 模块在我的 python 脚本中调用它并具有重试功能。