PySerial Arduino串口同时读写文件问题。

问题描述 投票:0回答:1

所以我有一段代码,------从Arduino读取串行输入,当特定的键被按下时,将串行数据写入Arduino,我还想增加一个功能,将串行数据写入文本文件。我已经采用了线程来实现这些操作。但是我有一个很大的问题,因为我无法向文件中写入任何内容。这是我正在使用的代码。

from pynput import keyboard
import threading
import serial
import sys
import io


ser = None






class SerialReaderThread(threading.Thread):


    def run(self):

        global ser

        ser = serial.Serial('COM4', baudrate = 9600, timeout = 5)

        while True:

            print(ser.readline().decode('utf-8'))


class FileWriting(threading.Thread):


   def run(self):

       while True:
             with io.open("output.txt", "a", encoding="utf-8") as f:
                    f.write(ser.readline().decode('utf-8'))


class KeyboardThread(threading.Thread):

    def run(self):

        def on_press(key):

            try:
                format(key.char)

                if key.char == "1":
                    ser.write(b'1\r\n') #serial write - 1

                elif key.char == "2":
                    ser.write(b'2\r\n') #serial write - 2

                elif key.char == "3":
                    ser.write(b'3\r\n') #serial write - 3

                elif key.char == "4":
                    ser.write(b'4\r\n') #serial write - 4

                elif key.char == "5":
                    ser.write(b'5\r\n') #serial write - 5    

                elif key.char == "6":
                    ser.write(b'6\r\n') #serial write - 6

                elif key.char == "0":
                    ser.write(b'0\r\n') #serial write - 0      
            except AttributeError:
                format(key)





        with keyboard.Listener(on_press=on_press) as listener:
            listener.join()

        listener = keyboard.Listener(on_press=on_press)
        listener.start()


serial_thread = SerialReaderThread()
keyboard_thread = KeyboardThread()

file_thread = FileWriting()



serial_thread.start()

keyboard_thread.start()
file_thread.start()


serial_thread.join()
keyboard_thread.join()
file_thread.join()

当我运行这段代码时 读取数据和向Arduino写入串行数据一样完美 但正如我前面所说的 向文本文件写入数据时却不行 这让我产生了这个错误

Exception in thread Thread-3:
Traceback (most recent call last):
  File "C:\Users\Tsotne\AppData\Local\Programs\Python\Python38-32\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Users\Tsotne\AppData\Local\Programs\Python\Python38-32\project\menucode.py", line 44, in run
    f.write(ser.readline().decode('utf-8'))
AttributeError: 'NoneType' object has no attribute 'readline'

如果有人知道解决这个问题的方法,请任何帮助是感激的。

先谢谢你

python serialization arduino com pyserial
1个回答
0
投票

你不能有2个串行实例指向同一个端口,你不能同时读和写......你需要做一些类似下面的事情。

import threading
import serial,time

class MySerialWorker(object):
    pending_commands = [] # just commands
    pending_queries = [] # commands awaiting responses
    def __init__(self,*args,**kwargs):
        # you might actually need to create this in the thread spawned later
        self.ser = serial.Serial(*args,**kwargs)

    def send_command(self,cmd):
        self.pending_commands.append(cmd)

    def query_command(self,cmd,callback):
        self.pending_queries.append([cmd,callback])

    def _do_query(self,cmd):
        self.ser.write(cmd)
        time.sleep(0.01) # give it a second
        return self.ser.readline() # since you are expecting a newline at end of response

    def main_loop(self):
        while self.ser.isOpen():
            while self.pending_queries: # handle these first
                cmd,callback = self.pending_queries.pop(0)
                callback(cmd,self._do_query(cmd))

            if self.ser.inWaiting(): # check for any pending incomming messages
                message_from_arduino = self.ser.readline()
                print("Arduino says:",message_from_arduino)
            while self.pending_commands: # send any commands we want
                # write any pending commands
                self.ser.write(self.pending_commands.pop(0))
            time.sleep(0.01) # sleep a tick

    def run_threaded(self):
        self.thread = threading.Thread(self.main_loop)
        self.thread.start()

    def terminate(self):
        self.ser.close() # closing the port will make the loop exit
        self.thread.join() # wait for thread to exit

class MainProgram():
    def __init__(self):
        self.ser = MySerialWorker("COM5",9600,timeout=1)
        self.pending_command = False

    def on_result(self,cmd,result):
        print("Device responded to %r with %r"%(cmd,result))
        self.pending_command = False

    def main_loop(self):
        self.ser.run_threaded()
        while True:
            if not self.pending_command:
                # await a command
                cmd = input("SENDQUERY>")
                self.pending_command = True
                self.ser.query_command(cmd=cmd+"\n",callback=self.on_result)
            time.sleep(0.01) # sleep a tick

MainProgram().main_loop()
© www.soinside.com 2019 - 2024. All rights reserved.