io.TextIOWrapper, AttributeError: 'function' 对象没有属性 'TextIOWrapper'

问题描述 投票:0回答:0

我正在尝试使用 Python

multiprocessing
PyQt5
(在 Windows 上)实时绘制串行数据。

我必须显示从加速度计传感器读取的数据,但我的

io
库有问题,我不知道如何解决它。

这是代码:

# -*- coding: utf-8 -*-
from pyqtgraph.Qt import QtGui
from PyQt5 import QtWidgets, QtCore
import numpy as np
import pyqtgraph as pg
from multiprocessing import Process, Manager, Queue
import sched, time, threading, sys, traceback
import serial
import io

# This function is responsible for displaying the data
# it is run in its own process to liberate main process
def display(name,q):
    app2 = QtWidgets.QApplication([])

    win2 = pg.GraphicsLayoutWidget(title="Basic plotting examples")
    win2.resize(1000,600)
    win2.setWindowTitle('pyqtgraph example: Plotting')
    p2 = win2.addPlot(title="Updating plot")
    curve = p2.plot(pen='y')

    x_np = []
    y_np = []

    def updateInProc(curve,q,x,y):
        item = q.get()
        x.append(item[0])
        y.append(item[1])
        curve.setData(x,y)

    timer = QtCore.QTimer()
    timer.timeout.connect(lambda: updateInProc(curve,q,x_np,y_np))
    timer.start(50)
    win2.show()
    app2.exec()

# This is function is responsible for reading some data (IO, serial port, etc)
# and forwarding it to the display
# it is run in a thread
def io(running,q):
    data_string = '0'

    ser = serial.Serial('COM8', 38400, timeout=10)
    sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
    ser.close()
    ser.open()

    t = 0
    while running.is_set():
        data_string = sio.readline() # example: 'x: -0.00292     y: -0.01562     z: 0.97119'
        x = float(data_string[data_string.find('x')+2 : data_string.index('y')])
        #y = float(data_string[data_string.find('y')+2 : data_string.index('z')])
        #z = float(data_string[data_string.find('z')+2 : ])
        t += 0.01
        q.put([t,x])
        time.sleep(0.01)
    ser.close()
    print("Done")

if __name__ == '__main__':
    q = Queue()
    # Event for stopping the IO thread
    run = threading.Event()
    run.set()

    # Run io function in a thread
    t = threading.Thread(target=io, args=(run,q))
    t.start()

    # Start display process
    p = Process(target=display, args=('bob',q))
    p.start()
    input("See ? Main process immediately free ! Type any key to quit.")
    run.clear()
    print("Waiting for scheduler thread to join...")
    t.join()
    print("Waiting for graph window process to join...")
    p.join()
    print("Process joined successfully. C YA !")

    exit()

这里是回溯

Exception in thread Thread-1 (io):
Traceback (most recent call last):
  File "C:\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "C:\Python311\Lib\threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Desktop\LISA\Real-time-plotter\pygraph-test.py", line 44, in io
    sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
          ^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute 'TextIOWrapper'

在另一个文件中,我尝试从串口读取一行,它工作正常:

import serial
import io

data_string = '0'

ser = serial.Serial('COM8', 38400, timeout=10)
sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
ser.close()
ser.open()

data_string = sio.readline() # example: 'x: -0.00292     y: -0.01562     z: 0.97119'

print(data_string)

x = float(data_string[data_string.find('x')+2 : data_string.index('y')])
y = float(data_string[data_string.find('y')+2 : data_string.index('z')])
z = float(data_string[data_string.find('z')+2 : ])

print(x,y,z)

ser.close()

这是我的第一个Python项目,请大家多多指教!

python-3.x io pyqt5 multiprocessing pyserial
© www.soinside.com 2019 - 2024. All rights reserved.