如何在Raspberry Pi上实现Modbus?

问题描述 投票:0回答:1

我目前正在一个项目中,我试图以Raspberry Pi 4作为主设备来实现Modbus,并作为从设备来控制许多执行器。为此,我为Pi购买了special shield。我运行了一个demo test program,确认Pi可以使用其新的护罩,但随后撞到了墙。

[Shield user manual-在用户手册文件夹中。

Master:

## To install dependencies:
## sudo pip3 install modbus-tk
##################################################################################################
import serial
import fcntl
import os
import struct
import termios
import array
#import modbus lib
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu

# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",9600)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    #set modbus master
    master = modbus_rtu.RtuMaster(
           serial.Serial(port= '/dev/ttySC0',
           baudrate=9600,
           bytesize=8,
           parity='N',
           stopbits=1,
           xonxoff=0)
       )

    master.set_timeout(5.0)
    master.set_verbose(True)
    logger.info("connected")

    logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 4))

    #send some queries
    #logger.info(master.execute(1, cst.READ_COILS, 0, 10))
    #logger.info(master.execute(1, cst.READ_DISCRETE_INPUTS, 0, 8))
    #logger.info(master.execute(1, cst.READ_INPUT_REGISTERS, 100, 3))
    #logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
    #logger.info(master.execute(1, cst.WRITE_SINGLE_COIL, 7, output_value=1))
    #logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 100, output_value=54))
    #logger.info(master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]))
    #logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 100, output_value=xrange(12)))

#end of if __name__ == '__main__': 

奴隶:

import sys

import serial
import fcntl
import os
import struct
import termios
import array
import time

import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu
# RS485 ioctls
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",9600)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of def rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    logger = modbus_tk.utils.create_logger(name="console", record_format="%(message)s")

    #Create the server
    server = modbus_rtu.RtuServer(serial.Serial('/dev/ttySC1'))

    try:
        logger.info("running...")
        logger.info("enter 'quit' for closing the server")

        server.start()

        slave_1 = server.add_slave(1)
        slave_1.add_block('0', cst.HOLDING_REGISTERS, 0, 100)
        while True:
            cmd = sys.stdin.readline()
            args = cmd.split(' ')

            if cmd.find('quit') == 0:
                sys.stdout.write('bye-bye\r\n')
                break

            elif args[0] == 'add_slave':
                slave_id = int(args[1])
                server.add_slave(slave_id)
                sys.stdout.write('done: slave %d added\r\n' % (slave_id))

            elif args[0] == 'add_block':
                slave_id = int(args[1])
                name = args[2]
                block_type = int(args[3])
                starting_address = int(args[4])
                length = int(args[5])
                slave = server.get_slave(slave_id)
                slave.add_block(name, block_type, starting_address, length)
                sys.stdout.write('done: block %s added\r\n' % (name))

            elif args[0] == 'set_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                values = []
                for val in args[4:]:
                    values.append(int(val))
                slave = server.get_slave(slave_id)
                slave.set_values(name, address, values)
                values = slave.get_values(name, address, len(values))
                sys.stdout.write('done: values written: %s\r\n' % (str(values)))

            elif args[0] == 'get_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                length = int(args[4])
                slave = server.get_slave(slave_id)
                values = slave.get_values(name, address, length)
                sys.stdout.write('done: values read: %s\r\n' % (str(values)))

            else:
                sys.stdout.write("unknown command %s\r\n" % (args[0]))
    finally:
        server.stop()

我打算使用的执行器是Linak LA36。我相信这些是我将要使用的功能:

enter image description here

this文档的第21-22页开始。

[隔离墙只是从Modbus开始。我已经研究了执行器的技术文档以确定要发送的内容,但是在编写程序方面迷失了方向。我曾希望也许能够修改演示程序以适合我的需求,但无法理解其中的代码用法。

[在互联网上搜索,我试图找到有关不同变量和函数的功能的教程或说明,以便更好地理解,但找不到类似的内容。我确实找到了演示代码originated的位置,但找不到/理解那里可以帮助我的任何内容。

我已经看到一些程序应该在Raspberry Pi上启用Modbus(例如PyModbus),但是我不确定我的情况是否有所不同,是否有特殊的屏蔽,以及这些程序是否对我的设置有效?

所以,最后,我在这里希望获得帮助。建议,说明,示例,在此刻,欢迎您提出任何建议,以使我更进一步。也可能是以演示代码为基础是错误的,有人可能会把我指向不同的方向?

我很愿意尝试其他事情,我们将不胜感激。

谢谢你。

更新:

此后我一直在寻找其他选项,偶然发现了我要使用的minimalmodbus。在RS485屏蔽仍处于演示配置的情况下...

RS485 shield

...我一直在尝试执行从Python解释器中的minimalmodbus中找到的一些代码:

>>> import minimalmodbus
>>> instr = minimalmodbus.Instrument('/dev/ttySC0', 1)
>>> instr
minimalmodbus.Instrument<id=0xb7437b2c, address=1, close_port_after_each_call=False, debug=False, serial=Serial<id=0xb7437b6c, open=True>(port='/dev/ttySC0', baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=0.05, xonxoff=False, rtscts=False, dsrdtr=False)>
>>> instr.read_register(24, 1)
5.0
>>> instr.write_register(24, 450, 1)
>>> instr.read_register(24, 1)

我将'/ dev / ttyUSB0'(在原始代码中)更改为'/ dev / ttySC0'。现在我停留在:

>>> instr
    minimalmodbus.Instrument<id=0xb7437b2c, address=1, close_port_after_each_call=False, debug=False, serial=Serial<id=0xb7437b6c, open=True>(port='/dev/ttySC0', baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=0.05, xonxoff=False, rtscts=False, dsrdtr=False)>

哪个给出了SyntaxError:无效语法高亮显示了[[minimalmodbus。

python raspberry-pi modbus rs485 raspberry-pi4
1个回答
0
投票
我认为您需要从简单的东西开始,并在它们之上构建。在您的评论中,您声明要使用

minimmodbus,这很好,但让我们从演示代码开始,然后尝试首先使用执行器。稍后,您可以返回到其他库,例如minimalmodbuspymodbus

在进入代码之前,我认为您应该了解什么是Modbus。本质上,Modbus使用串行端口(它可以通过RS485或更常规的RS232或TTL电平,但这只是物理层,用于传达信息的电气电平;您已经在帽子和帽子上安装了RS485端口)执行器也可以通过RS485进行工作,因此只要您已正确连接了总线(A到A和B到B)就不必担心该端的问题。

那么,除了串行端口外,还需要Modbus吗? Modbus在主从配置下工作。这意味着只有一台主设备(您的情况下是Raspberry Pi计算机)和一台或多台从设备(您的执行器)。根据我们在上一段中所说的,您的Modbus在两线(具有RS485电平)总线上运行。在这种配置下,与更通用的RS232标准相反,在RS232标准中,您拥有三根线:RX,TX和GND,因此无法进行全双工通信(只有主设备或所有从设备之一才能与总线进行通信,而所有其他设备收听,类似于对讲机无线电链接)。为了进一步扩展类比,就像您在WT无线电上需要一个PTT(即按即说)按钮一样,您还需要一个信号将Modbus上的主站或任何从站发送到

PTT

当他们想说话时。某些RS485收发器具有通过硬件实现的功能。在您的帽子上,无需详细检查电路并查看演示代码,似乎总线上的方向控制是通过具有rs485_enable()功能的软件实现的。。 编辑:详细了解硬件,我必须纠正自己:您的RS485帽子实际上是通过带有SPI至双UART SC16IS752的硬件通过硬件进行方向控制的。该芯片被设计为与使用流量控制RTS信号作为方向控制(我们前面提到的PTT功能)的旧UART向后兼容。这就是为什么您需要rs485_enable()足够的理论,现在有实际的部分。您似乎错过了一件重要的事情。在您链接的手册上,在第21页上有以下段落:

在集成到MODBUS系统中之前,必须检查执行器的一些参数并最终更改

。通过使用BusLink PC工具(该工具将在后面详细介绍)来完成准备工作,并保证执行器能够执行基本功能。进一步可能需要进行微调才能满足系统或应用程序的要求。然后,如果您返回第12页的表,您会看到他们所谓的寻址(通常称为从站ID)默认为247(未分配)。因此,您需要做的第一件事就是使用此Buslink PC工具将执行器上的地址设置为1到246之间的任何数字(如果您打算在总线上连接多个执行器,则必须设置一个不同的数字。每个执行器)。有关更多详细信息,请参见第28页。

成功完成该配置后,您应该能够运行演示主代码。例如,如果要将执行器移动10 mm,可以尝试:

## To install dependencies: ## sudo pip3 install modbus-tk ################################################################################################## import serial import fcntl import os import struct import termios import array #import modbus lib import modbus_tk import modbus_tk.defines as cst import modbus_tk.modbus as modbus #import modbus_tk.modbus_rtu as modbus_rtu from modbus_tk import modbus_rtu # RS485 ioctls define TIOCGRS485 = 0x542E TIOCSRS485 = 0x542F SER_RS485_ENABLED = 0b00000001 SER_RS485_RTS_ON_SEND = 0b00000010 SER_RS485_RTS_AFTER_SEND = 0b00000100 SER_RS485_RX_DURING_TX = 0b00010000 # rs 485 port ser1 = serial.Serial("/dev/ttySC0",19200) ser2 = serial.Serial("/dev/ttySC1",9600) def rs485_enable(): buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding #enable 485 chanel 1 fcntl.ioctl(ser1, TIOCGRS485, buf) buf[0] |= SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND buf[1] = 0 buf[2] = 0 fcntl.ioctl(ser1, TIOCSRS485, buf) #enable 485 chanel 2 fcntl.ioctl(ser2, TIOCGRS485, buf) buf[0] |= SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND buf[1] = 0 buf[2] = 0 fcntl.ioctl(ser2, TIOCSRS485, buf) #end of rs485_enable(): if __name__ == '__main__': logger = modbus_tk.utils.create_logger("console") rs485_enable() #set modbus master master = modbus_rtu.RtuMaster( serial.Serial(port= '/dev/ttySC0', baudrate=9600, bytesize=8, parity='N', stopbits=1, xonxoff=0) ) master.set_timeout(5.0) master.set_verbose(True) logger.info("connected") logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 1, output_value=100)) #Write target position 10mm (1/10mm*100) logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 2, output_value=1)) #Move actuator

请注意,我只更改了演示代码的最后两行。 cst.WRITE_SINGLE_REGISTER之前的第一个1必须与您使用BusLink PC Tool设置的地址从站相同。紧随其后的数字(第一行中的1和第二行中的2)是您需要根据手册第22页写入的寄存器号。最后,output_value是您需要在每个寄存器上写入的值。在编号为1的寄存器上,您需要写下要从其参考位置移动致动器的目标位置(以0.1mm的倍数测量),第二个位置只需写一个1(再次参见手册第22页的表,步骤2和3)。

您可以通过读取输入寄存器3和5来完成第4步的序列。注意,要读取输入寄存器的功能代码是cst.READ_INPUT_REGISTERS

试一下,看看是否可以使它工作。

完成后,我们可以查看minimalmodbus

EDIT:更好地了解您的硬件如何工作(请参见上面的编辑),现在您可以使用所需的任何Modbus库了,您只需要将演示代码保留在#end of rs485_enable():上方,并在开始发送数据之前在某处调用rs485_enable()

对于

minimalmodbus

,您可以尝试执行以下操作:import serial import fcntl import os import struct import termios import array #Remove modbus-tk imports and add minimalmodbus import minimalmodbus # only /dev/ttySC0 will be used # RS485 ioctls define TIOCGRS485 = 0x542E TIOCSRS485 = 0x542F SER_RS485_ENABLED = 0b00000001 SER_RS485_RTS_ON_SEND = 0b00000010 SER_RS485_RTS_AFTER_SEND = 0b00000100 SER_RS485_RX_DURING_TX = 0b00010000 # rs 485 port ser1 = serial.Serial("/dev/ttySC0",19200) def rs485_enable(): buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding #enable 485 chanel 1 fcntl.ioctl(ser1, TIOCGRS485, buf) buf[0] |= SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND buf[1] = 0 buf[2] = 0 fcntl.ioctl(ser1, TIOCSRS485, buf) #end of rs485_enable(): if __name__ == '__main__': actuator = minimalmodbus.Instrument('/dev/ttySC0', 1) # port name, slave address (in decimal), change according to actuator address rs485_enable() #you need to keep this for your hat to work #minimalmodbus setup actuator.serial.port # this is the serial port name actuator.serial.baudrate = 19200 # Baud rate actuator.serial.bytesize = 8 actuator.serial.parity = serial.PARITY_NONE actuator.serial.stopbits = 1 actuator.serial.timeout = 0.05 # seconds actuator.address # this is the slave (actuator) address number actuator.mode = minimalmodbus.MODE_RTU # rtu mode #write registers actuator.write_register(1, 100) #write target distance to move actuator.write_register(2, 1) #Move!
© www.soinside.com 2019 - 2024. All rights reserved.