目录
  • 使用pymodbus库进行modbus tcp通信
  • 创建modbus tcp server
  • 读写modbus变量
    • modbus变量类型以及地址
    • 读取常规变量
    • 读取复杂变量
  • 总结

    使用pymodbus库进行modbus tcp通信

    使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。

    本篇主要介绍使用pymodbus库进行modbus tcp仿真,实现pc端读取plc或工业设备modbus变量。

    安装pymodbus:

    pip install -U pymodbus

    创建modbus tcp server

    这里我们先创建一个虚拟的modbus设备,如果你手里有一个plc或者工业设备,可以直接跳过本节。

    • modbus_server.py
    '''
     * @Author: liuzhao 
     * @Last Modified time: 2022-10-05 09:56:13 
    '''
    
    from pymodbus.server.sync import (
        StartTcpServer,
    )
    from pymodbus.datastore import (
        ModbusSequentialDataBlock,
        ModbusServerContext,
        ModbusSlaveContext,
    )
    from pymodbus.version import version
    
    datablock = ModbusSequentialDataBlock.create()
    context = ModbusSlaveContext(
        di=datablock,
        co=datablock,
        hr=datablock,
        ir=datablock,
        )
    single = True
    
    # Build data storage
    store = ModbusServerContext(slaves=context, single=single)
    
    
    if __name__ == '__main__':
    
    	address = ("0.0.0.0", 503)
    	StartTcpServer(
    	    context=store,  # Data storage
    	    address=address,  # listen address
    	  	allow_reuse_address=True,  # allow the reuse of an address
    	)
    

    直接运行该脚本,就可以在本机的503端口创建一台modbus设备了,具体实现暂不深追,我们学习的重点是客户端对modbus变量的读写。

    读写modbus变量

    modbus变量类型以及地址

    Object type Access Size Address
    Coil Read-write 1 bit 00001 – 09999
    Discrete input Read-only 1 bit 10001 – 19999
    Input register Read-only 16 bits 30001 – 39999
    Holding register Read-write 16 bits 40001 – 49999

    coil是线圈,Discrete input是数字量输入,Input register是模拟量输入,Holding register是保持寄存器。一般地址范围是0-65535

    读取常规变量

    读写线圈 | 读取输入变量 | 读写保持寄存器

    from pymodbus.client.sync import ModbusTcpClient
    from pymodbus.bit_read_message import ReadCoilsResponse
    from pymodbus.register_read_message import ReadInputRegistersResponse
    from pymodbus.exceptions import ConnectionException      # 连接失败,用于异常处理
    
    host = '127.0.0.1'
    port = 503
    client = ModbusTcpClient(host,port)
    
    
    # 写入线圈
    client.write_coil(1, True)
    client.write_coil(2, False)
    client.write_coil(3, True)
    
    # 读取线圈    注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个
    result:ReadCoilsResponse = client.read_coils(address=1,cout=8)     # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题
    print(result.isError())
    
    # 不建议使用
    print(result.getBit(7))            # 这里的参数address不是plc里的地址,而是python列表的address,
    
    print('read_coils ')
    
    # 建议使用
    print(result.bits)        # 打印读取结果,一共8位
    # 读取其中的位
    print(                   
        result.bits[0],
        result.bits[1],
        result.bits[2]
        )         # 相当于result.getBit(0)
    
    
    # 读取数字输入
    result = client.read_discrete_inputs(address=10001,count=8)    # 从10001开始读,读取8位
    print(result.bits)
    
    
    # 读取模拟输入寄存器
    input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8)
    # print(f'is_error:{input_register_result.isError()}')
    print('read_input_registers ')
    print(input_register_result.registers)   
    print(input_register_result.getRegister(0))   
    
    
    # 读写保持寄存器
    client.write_register(address=40001,value=100)
    result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1)
    print('read_holding_registers ')
    print(result.registers)
    
    # 关闭连接
    client.close()
    
    

    读取复杂变量

    字符串、浮点数、负数等

    这里需要注意modbus设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。

    根据不同的设备,对照下表调整正确的组合方式。

    Word Order Byte order Word1 Word2
    Big Big 0x1234 0x5678
    Big Little 0x3412 0x7856
    Little Big 0x5678 0x1234
    Little Little 0x7856 0x3412
    # 复杂数据类型
    
    from collections import OrderedDict
    import logging
    
    from pymodbus.client.sync import ModbusTcpClient as ModbusClient
    
    from pymodbus.constants import Endian
    from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
    
    
    
    ORDER_DICT = {"<": "LITTLE", ">": "BIG"}
    
    
    def run_binary_payload_client(host:str,port:int):
      
        for word_endian, byte_endian in (
            (Endian.Big, Endian.Big),
            (Endian.Big, Endian.Little),
            (Endian.Little, Endian.Big),
            (Endian.Little, Endian.Little),
        ):
            print("-" * 60)
            print(f"Word Order: {ORDER_DICT[word_endian]}")
            print(f"Byte Order: {ORDER_DICT[byte_endian]}")
            print()
        
            builder = BinaryPayloadBuilder(
                wordorder=word_endian,
                byteorder=byte_endian,
            )
    		
    		# 写入的变量
            my_string = "abcd-efgh123345765432"
            builder.add_string(my_string)
            builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
            builder.add_8bit_int(-0x12)
            builder.add_8bit_uint(0x12)
            builder.add_16bit_int(-0x5678)
            builder.add_16bit_uint(0x1234)
            builder.add_32bit_int(-0x1234)
            builder.add_32bit_uint(0x12345678)
            builder.add_16bit_float(12.34)
            builder.add_16bit_float(-12.34)
            builder.add_32bit_float(22.34)
            builder.add_32bit_float(-22.34)
            builder.add_64bit_int(-0xDEADBEEF)
            builder.add_64bit_uint(0x12345678DEADBEEF)
            builder.add_64bit_uint(0x12345678DEADBEEF)
            builder.add_64bit_float(123.45)
            builder.add_64bit_float(-123.45)
            registers = builder.to_registers()
            print("Writing Registers:")
            print(registers)
            print("\n")
            payload = builder.build()
            address = 40001          # 从40001开始写入
            # We can write registers
            client.write_registers(address, registers, unit=1)    # 写入
         	
         	# 读取复杂变量
            print("Reading Registers:")
            address = 40001
    
            count = len(payload)
            print(f"payload_len {count}")
            result = client.read_holding_registers(address, count, slave=1)
            print(result.registers)
            print("\n")
            decoder = BinaryPayloadDecoder.fromRegisters(
                result.registers, byteorder=byte_endian, wordorder=word_endian
            )
            # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder
            assert (
                decoder._byteorder == builder._byteorder  # pylint: disable=protected-access
            )  # nosec
            assert (
                decoder._wordorder == builder._wordorder  # pylint: disable=protected-access
            )  # nosec
    
            decoded = OrderedDict(
                [
                    ("string", decoder.decode_string(len(my_string))),
                    ("bits", decoder.decode_bits()),
                    ("8int", decoder.decode_8bit_int()),
                    ("8uint", decoder.decode_8bit_uint()),
                    ("16int", decoder.decode_16bit_int()),
                    ("16uint", decoder.decode_16bit_uint()),
                    ("32int", decoder.decode_32bit_int()),
                    ("32uint", decoder.decode_32bit_uint()),
                    ("16float", decoder.decode_16bit_float()),
                    ("16float2", decoder.decode_16bit_float()),
                    ("32float", decoder.decode_32bit_float()),
                    ("32float2", decoder.decode_32bit_float()),
                    ("64int", decoder.decode_64bit_int()),
                    ("64uint", decoder.decode_64bit_uint()),
                    ("ignore", decoder.skip_bytes(8)),
                    ("64float", decoder.decode_64bit_float()),
                    ("64float2", decoder.decode_64bit_float()),
                ]
            )
            print("Decoded Data")
            for name, value in iter(decoded.items()):
                print(
                    "%s\t" % name,  # pylint: disable=consider-using-f-string
                    hex(value) if isinstance(value, int) else value,
                )
            print("\n")
    
    	# 关闭连接
        client.close()
    
    
    if __name__ == "__main__":
        run_binary_payload_client("127.0.0.1", 503)    

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持小闻网。

    声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。