我对 python 比较陌生,正在尝试制作一个新程序来与 canbus 交互。我一直在研究使用 SocketCAN 接口的 python-can 的文档和一些示例,但遇到了一些问题。
我已确认硬件正常工作。我有一个单独的设备来将测试模式传输到 CANbus 上,并且当使用默认侦听器“can.Printer()”时,传输到 CANbus 上的预期消息会正确打印在终端中。
我的下一步是创建一个新的侦听器函数,该函数最终将处理从 CANbus 接收到的消息(我的代码中的 msg_rx_routine 函数)。当使用我现在的代码时,通常会打印第一个收到的 CAN 消息,但随后会出现“非类型”对象不可调用的回调错误。
我的硬件: -rPi CM4 + I/O扩展板 -MCP2515 分线板
测试代码:
import can
import time
import os
#--------define support functions
"""
FUNCTION: message receive routine
@ARG can_bus: passed can object
RETURN: none
"""
def msg_rx_routine(can_bus):
rx_msg = can_bus.recv()
print(rx_msg)
#--------CANBUS init
os.system("sudo /sbin/ip link set can0 up type can bitrate 500000") #bring up can0 interface at 500kbps
time.sleep(0.05) #brief pause
CAN1 = can.interface.Bus(channel='can0', bustype='socketcan') #instance CAN object
#TODO: configure CAN connection (masks, etc)
#CAN1_listener = can.Printer() #assign default "printer" listener to print to terminal
CAN1_listener = msg_rx_routine(CAN1) #assign message handler function as a listener
CAN1_notifier = can.Notifier(CAN1, [CAN1_listener]) #assign listener to notifier
def main():
while True:
pass #just dummy loop to wait on a break command
if __name__ == "__main__":
main()
我觉得我显然误解了这个作品的“监听器/通知器”功能的核心概念,这让我感到困惑。我现在(试图)理解这一点就像微控制器的 C 代码中的 IRQ 请求如何工作一样。给定中断(通知程序),它将调用一些链接函数(侦听器)。
如前所述,我想要实现的是: -等待消息 -on消息接收、通话功能 -函数对消息执行某些操作
我看到here有一个类似的讨论,有人在做同样的事情。我(认为)他们传递
parse_data
函数只是 can
数据类型,对吗?虽然说你在哪条总线上寻找消息更有意义,但为什么你只将通用的 can
类型传递给函数,对我来说没有意义。我不确定这是否只是我对我发现的许多示例中的某些内容或非唯一变量命名的误解。
编辑-1:
在做了更多阅读之后,我意识到我在一个相当“doh”的时刻误解了原始代码。代码中的
CAN1_listener = can.Printer()
部分只是创建一个 can.Printer
类的新对象,然后将其分配给通知程序。我试图让我自己的课程基本上复制它,但我仍然遇到问题:
class can_msg_handler:
def __init__( self, **kwargs) -> None:
pass
def on_message_reveived(self, msg: can.Message) -> None:
print(msg)
当将新类与
CAN1_listener = can_msg_handler()
使用时,我现在收到“对象不可调用”错误。显然,用 py 可以阅读/学习更多内容,所以我会坚持下去。
如 can.Notifier
的
documentation中所述,每个
listener
必须是 can.Listener 的实例,或者是采用 can.Message
作为第一个参数的可调用对象。
您可以通过继承
can_msg_handler
来使用您的类can.Listener
:
class can_msg_handler(can.Listener):
def __init__( self, **kwargs) -> None:
pass
def on_message_reveived(self, msg: can.Message) -> None:
print(msg)
...
CAN1_listener = can_msg_handler
或者您将函数
msg_rx_routine
作为侦听器传递:
CAN1_listener = msg_rx_routine