USB Python 脚本问题 Raspberry Pi 操作系统

问题描述 投票:0回答:1

我遇到一个问题,我试图在后台触发 Python 脚本 最多可插入四个 USB 设备,所有设备均具有相同的 USB 供应商和产品 ID。

为了实现此目的,我在

/etc/udev/rules.d/myrule.rules
中创建了一条 udev 规则,如下所示:

ACTION=="add", SUBSYSTEM=="usb", ATTR{idVendor}=="1234", ATTR{idProduct}=="5678", ENV{SYSTEMD_WANTS}="usb-trigger.service"

1234 和 5689 只是示例。

此外,我在

/etc/systemd/system/usb-trigger.service
中设置了一个 systemd 服务,配置如下:

[Unit]
Description=USB Trigger Service

[Service]
Type=oneshot
ExecStart=/path/to/my/python/script/uart.py

[Install]
WantedBy=multi-user.target

但是,脚本有时执行,有时不执行。此外,如果我插入多个第一个设备或第二个设备,后台脚本将无法启动。有时,所有设备都无法按预期工作。

我的 Python 脚本的主要目标是在插入 USB 设备时生成 CSV 文件,保存数据,并在拔下设备时关闭文件。我需要这个脚本来处理最多四个设备。

这是我正在使用的Python脚本:

python
import serial
import serial.tools.list_ports
import time
import csv
import datetime
import sys

def get_current_time():
    current_time = time.time()
    milliseconds = int((current_time - int(current_time)) * 1000)
    formatted_time = time.strftime("%H:%M:%S", time.localtime(current_time))
    formatted_time += f".{milliseconds:03d}"  # Adding milliseconds
    return formatted_time

def write_to_csv(writer, data):
    try:
        writer.writerow(data)
    except Exception as e:
        print(f"Error writing to CSV: {e}")

def find_com_port():
    try:
        com_ports = serial.tools.list_ports.comports()
        for port in com_ports:
            if port.device.startswith('/dev/ttyUSB') or port.device.startswith('/dev/ttyACM'):
                return port.device
    except Exception as e:
        print(f"Error finding COM port: {e}")

def main():
    try:
        while True:
            com_port = find_com_port()
            if com_port is not None:
                print(f"COM port found: {com_port}")
                break  # Exit the loop once a COM port is found
            else:
                print("No COM port found. Retrying...")
                time.sleep(1)      

        if com_port is None:
            print("No COM port found.")
            return

        filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".csv"
        bottle_id = None
        bottle_id_collected = False  # Flag to track if bottle ID has been collected

        with serial.Serial(port=com_port, baudrate=460800, timeout=2) as ser, open(filename, 'w', newline='') as file:
            print(f"Connected to {com_port} successfully.")
            writer = csv.writer(file)

            while True:
                received_bytes = ser.readline().decode('ascii', errors='replace').strip()
                if received_bytes:
                    if not bottle_id_collected and 'Bottle ID' in received_bytes:
                        bottle_id = received_bytes.split('Bottle ID')[1].strip()
                        writer.writerow(['Bottle ID', bottle_id])
                        writer.writerow(['Input 1', 'Input 2', 'Input 3', 'time stamp'])
                        bottle_id_collected = True
                    else:
                        parts = received_bytes.split(', ')
                        try:
                            numbers = [float(part) for part in parts]
                            data_row = numbers + [get_current_time()]
                            writer.writerow(data_row)
                            print(f"Writing to CSV: {data_row}") 
                            file.flush()  # Force flushing
                        except ValueError as e:
                            pass
                time.sleep(0.01)

    except serial.SerialException as e:
        print(f"Error opening serial port {com_port}: {e}")

    except KeyboardInterrupt:
        print("Program terminated by user.")

    except Exception as e:
        print(f"An error occurred: {e}")

    finally:
        try:
            if 'ser' in locals() and ser.is_open:
                ser.close()
        except Exception as e:
            print(f"Error closing serial port: {e}")

        try:
            if 'file' in locals() and not file.closed:
                file.close()


        except Exception as e:
            print(f"Error closing CSV file: {e}")

        sys.exit()

if __name__ == "__main__":
    main()

任何关于脚本行为不一致的原因以及我如何确保它在插入多个 USB 设备时可靠运行的见解将不胜感激。谢谢!

我配置了 udev 规则和 systemd 服务,以便在插入具有相同 ID 的多个 USB 设备时触发 Python 脚本。我希望该脚本能够可靠地为每个设备生成 CSV 文件,同时处理最多四个设备。但是,脚本有时无法启动,尤其是插入多个设备时,会导致行为不一致。

python raspberry-pi usb systemd udev
1个回答
0
投票

如果您想要多个实例,您需要显式地将您的.service 转换为多实例(模板化)单元。否则,如果一个实例仍在运行,第二次启动将不会执行任何操作。这适用于所有服务类型 – 甚至适用于 Type=oneshot 服务。

  1. 将您的单位重命名为

    [email protected]

  2. 将 udev 规则更改为

    ENV{SYSTEMD_WANTS}="usb-trigger@%k.service"

  3. 更改 ExecStart 以将

    %i
    作为参数传递给脚本。

    ExecStart=/path/to/my/python/script/uart.py %i
    
  4. 更改脚本以从

    sys.argv
    读取设备名称(最好使用
    argparse
    )。

    parser = argparse.ArgumentParser()
    parser.add_argument("device")
    args = parser.parse_args()
    
    print(f"I was called for {device!r}", file=sys.stderr) # ¹
    
  5. 从脚本中删除现在不需要的“find_com_port()”代码。


¹ (就此而言,您确实应该使用 stderr 来显示错误消息;您应该将 stdout 配置为行缓冲,否则消息可能会被缓冲并且不会立即显示在日志中;理想情况下,您应该通过以下方式使用 syslog

syslog.syslog()
而不是所有这些。)

© www.soinside.com 2019 - 2024. All rights reserved.