所以我正在尝试使用 python 从串口读取数据。
我计划将来将这些数据提供给我的异步服务器。 来自 c#,我总是有上课的冲动。 然而,当我运行代码时,我得到了循环已经在运行的异常。
据我所知,任务将被放入循环然后执行,所以我想我在这里遗漏了一些东西。
从最佳实践方法来看,像这样在 python 中创建类是否有意义,或者是否有“更好”的方法?我感谢所有输入。
我做了一个类,它应该处理从串口读取的数据,稍后应该将值分配给 opc 服务器上的节点。
对于阅读,我使用了此处制作的示例 pySerialDocumentation
import asyncio
import serial_asyncio
from async_serial_output import OutputProtocol,InputChunkProtocol
import logging
class EboxAndino():
"""
Class which manages the connection to the serial port
"""
ebox = None
timeout: int
counter1Opc = None
counter2Opc = None
input1Opc = None
input2Opc = None
Heartbeat = None
logger = None
debug = False
setup_dict = {}
counterValueDict = {}
hwPort = None
baud = None
timeout = None
debug = False
logger = logging.getLogger('foo')
hBeat = False
ser = None
rts = False
def __init__(self, ebox, hwPort, baud, timeout, debug, loggerName) -> None:
"""
@param ebox: opcObj
@param hwPort: string
@param baud: int
@param timeout: int
@param debug: bool
@param loggerName: string
"""
# self = EboxAndino('Ebox')
self.ebox = ebox
self.hwPort = hwPort
self.baud = baud
self.timeout = timeout
self.debug = debug
self.hBeat = False
self.ser = None
self.rts = False
print('created!')
async def initCounter(self):
self.hBeat = False
self.counterValueDict = {"Counter1": None,
"Counter2": None,
"Input1": None,
"Input2": None,
}
setup_list = [*self.counterValueDict]
print ('Counter is up!')
def get_values(self):
if self.counterValueDict:
return self.counterValueDict
else:
return None
async def line_reader(self,loop):
coro = await serial_asyncio.create_serial_connection(loop, OutputProtocol, '/dev/ttyAMA0', baudrate = 38400)
await asyncio.sleep(1)
transport, protocol = loop.run_until_complete(coro)
await asyncio.sleep(0.5)
loop.run_forever()
loop.close()
我的主要蜜蜂
async def main():
loop = asyncio.get_event_loop()
andino = EboxAndino('foo', '/dev/ttyAMA0', 38400, 1, True, 'foo')
await andino.initCounter()
await andino.line_reader(loop)
loop = asyncio.get_event_loop()
asyncio.sleep(2)
loop.run_until_complete(main())
运行它会把第一行分成 4 个收到的包。 之后它会得到一个异常,表明循环已经在运行。 为什么?
作为后续问题,抓取值并将值存储到 opc 服务器的最佳位置在哪里?
谢谢大家!
在 asyncio 中你只能运行一个循环。 您使用的示例是针对同步上下文的。但是您现在已经处于异步上下文中。所以不需要调用 loop.run_until.
async def line_reader(self,loop):
transport, protocol = await serial_asyncio.create_serial_connection(loop, OutputProtocol, '/dev/ttyAMA0', baudrate = 38400)
await asyncio.sleep(1)
await asyncio.sleep(0.5)