Skip to content

Modbus RTU/TCP 最佳实践

RS-485 硬件接线规范

接线要点

正确的 RS-485 总线接线:

主站                从站1              从站2              从站3
  │                  │                  │                  │
  A(+)───────────────A(+)───────────────A(+)───────────────A(+)
  B(-)───────────────B(-)───────────────B(-)───────────────B(-)
  GND────────────────GND────────────────GND────────────────GND
  │                                                         │
 120Ω                                                      120Ω
(终端电阻)                                            (终端电阻)

关键规则:
1. 总线两端(最远两个节点)各接一个 120Ω 终端电阻
2. 中间节点不接终端电阻
3. 使用双绞屏蔽线(推荐 RVSP 2×0.5mm²)
4. 屏蔽层在主站侧单点接地,避免地环路
5. 总线长度 ≤ 1200m(9600bps),速率越高距离越短
6. 节点数 ≤ 32(标准 RS-485),使用中继器可扩展

防雷与隔离

工业现场必须考虑:

主站 ──[光电隔离]──[RS-485 收发器]──[TVS 防雷管]── 总线

                                   GND(现场地)

推荐器件:
- 隔离芯片:ADM2587E(带隔离的 RS-485 收发器)
- 防雷管:SMBJ6.5A(双向 TVS)
- 共模扼流圈:ACM2012-900-2P-T

轮询策略优化

分组轮询

python
import asyncio
from dataclasses import dataclass
from typing import List, Dict
import time

@dataclass
class RegisterGroup:
    """连续寄存器组,一次读取"""
    start_address: int
    count: int
    unit_id: int
    name: str
    interval: float  # 轮询间隔(秒)

class OptimizedModbusPoller:
    """
    优化的 Modbus 轮询器:
    1. 合并连续寄存器为一次读取
    2. 按优先级分配轮询频率
    3. 异常时自动降频
    """

    def __init__(self, client):
        self.client = client
        self.groups: List[RegisterGroup] = []
        self.last_poll: Dict[str, float] = {}
        self.error_count: Dict[str, int] = {}

    def add_group(self, group: RegisterGroup):
        self.groups.append(group)

    async def poll_loop(self):
        while True:
            now = time.time()
            for group in self.groups:
                last = self.last_poll.get(group.name, 0)
                # 错误时降频(指数退避)
                errors = self.error_count.get(group.name, 0)
                effective_interval = group.interval * (2 ** min(errors, 4))

                if now - last >= effective_interval:
                    await self._poll_group(group)
                    self.last_poll[group.name] = now

            await asyncio.sleep(0.05)  # 50ms 调度精度

    async def _poll_group(self, group: RegisterGroup):
        try:
            result = self.client.read_holding_registers(
                address=group.start_address,
                count=group.count,
                slave=group.unit_id
            )
            if not result.isError():
                self.error_count[group.name] = 0
                await self._process_result(group, result.registers)
            else:
                self._handle_error(group, str(result))
        except Exception as e:
            self._handle_error(group, str(e))

    def _handle_error(self, group: RegisterGroup, error: str):
        self.error_count[group.name] = self.error_count.get(group.name, 0) + 1
        logging.warning(f"Poll error for {group.name}: {error}, "
                       f"count={self.error_count[group.name]}")

# 使用示例:BMS 数据采集
poller = OptimizedModbusPoller(client)

# 高频数据:电压电流(1秒)
poller.add_group(RegisterGroup(0, 20, 1, "bms_voltage_current", 1.0))

# 中频数据:温度(5秒)
poller.add_group(RegisterGroup(100, 16, 1, "bms_temperature", 5.0))

# 低频数据:SOC/SOH(30秒)
poller.add_group(RegisterGroup(200, 4, 1, "bms_soc_soh", 30.0))

寄存器合并优化

python
def merge_register_requests(addresses: List[int], max_gap: int = 5) -> List[tuple]:
    """
    将离散的寄存器地址合并为连续读取请求
    max_gap: 允许的最大空洞(填充读取,减少请求次数)

    示例:
    输入:[0, 1, 2, 5, 6, 10, 11, 12]
    输出:[(0, 3), (5, 2), (10, 3)]  # (起始地址, 数量)
    """
    if not addresses:
        return []

    sorted_addrs = sorted(set(addresses))
    groups = []
    start = sorted_addrs[0]
    end = sorted_addrs[0]

    for addr in sorted_addrs[1:]:
        if addr - end <= max_gap + 1:
            end = addr
        else:
            groups.append((start, end - start + 1))
            start = addr
            end = addr

    groups.append((start, end - start + 1))
    return groups

超时与重试策略

python
import time
from functools import wraps

def modbus_retry(max_retries=3, delay=0.5, backoff=2.0):
    """Modbus 操作重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            current_delay = delay

            for attempt in range(max_retries):
                try:
                    result = func(*args, **kwargs)
                    if hasattr(result, 'isError') and result.isError():
                        raise ModbusException(f"Modbus error: {result}")
                    return result
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        logging.warning(
                            f"Attempt {attempt+1} failed: {e}, "
                            f"retrying in {current_delay:.1f}s"
                        )
                        time.sleep(current_delay)
                        current_delay *= backoff

            raise last_exception
        return wrapper
    return decorator

@modbus_retry(max_retries=3, delay=0.5)
def read_registers(client, address, count, unit_id):
    return client.read_holding_registers(address, count, slave=unit_id)

数据质量管理

python
from enum import IntEnum
from dataclasses import dataclass
from typing import Optional

class DataQuality(IntEnum):
    GOOD = 0
    UNCERTAIN = 1
    BAD = 2
    COMM_FAILURE = 3

@dataclass
class TagValue:
    value: Optional[float]
    quality: DataQuality
    timestamp: float
    raw_value: Optional[int] = None

class QualityAwareReader:
    """带数据质量标记的 Modbus 读取器"""

    def __init__(self, client, max_age: float = 30.0):
        self.client = client
        self.max_age = max_age  # 数据最大有效期(秒)
        self.cache: Dict[str, TagValue] = {}

    def read_tag(self, tag_name: str, address: int,
                 unit_id: int, scale: float = 1.0) -> TagValue:
        try:
            result = self.client.read_holding_registers(
                address=address, count=1, slave=unit_id
            )

            if result.isError():
                return TagValue(
                    value=None,
                    quality=DataQuality.BAD,
                    timestamp=time.time()
                )

            raw = result.registers[0]
            value = raw * scale

            # 范围检查
            quality = DataQuality.GOOD
            if not self._is_in_range(tag_name, value):
                quality = DataQuality.UNCERTAIN

            tag_value = TagValue(
                value=value,
                quality=quality,
                timestamp=time.time(),
                raw_value=raw
            )
            self.cache[tag_name] = tag_value
            return tag_value

        except Exception as e:
            logging.error(f"Read {tag_name} failed: {e}")
            return TagValue(
                value=None,
                quality=DataQuality.COMM_FAILURE,
                timestamp=time.time()
            )

    def get_cached(self, tag_name: str) -> TagValue:
        """获取缓存值,超时则标记为不确定"""
        cached = self.cache.get(tag_name)
        if cached is None:
            return TagValue(None, DataQuality.BAD, 0)

        age = time.time() - cached.timestamp
        if age > self.max_age:
            return TagValue(
                cached.value,
                DataQuality.UNCERTAIN,
                cached.timestamp
            )
        return cached

Modbus TCP 连接池

python
import threading
from queue import Queue, Empty

class ModbusTCPConnectionPool:
    """Modbus TCP 连接池,避免频繁建立/断开连接"""

    def __init__(self, host: str, port: int = 502,
                 pool_size: int = 5, timeout: float = 3.0):
        self.host = host
        self.port = port
        self.timeout = timeout
        self._pool: Queue = Queue(maxsize=pool_size)
        self._lock = threading.Lock()

        # 预创建连接
        for _ in range(pool_size):
            conn = self._create_connection()
            if conn:
                self._pool.put(conn)

    def _create_connection(self):
        from pymodbus.client import ModbusTcpClient
        client = ModbusTcpClient(
            host=self.host,
            port=self.port,
            timeout=self.timeout
        )
        if client.connect():
            return client
        return None

    def acquire(self, timeout: float = 5.0):
        """获取连接"""
        try:
            conn = self._pool.get(timeout=timeout)
            # 检查连接是否仍然有效
            if not conn.is_socket_open():
                conn.connect()
            return conn
        except Empty:
            raise TimeoutError("No available Modbus connections")

    def release(self, conn):
        """归还连接"""
        try:
            self._pool.put_nowait(conn)
        except Exception:
            conn.close()

    def __enter__(self):
        self._conn = self.acquire()
        return self._conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release(self._conn)

# 使用示例
pool = ModbusTCPConnectionPool("192.168.1.100", pool_size=3)

with pool as client:
    result = client.read_holding_registers(0, 10, slave=1)

安全注意事项

Modbus 协议本身无安全机制,工业现场需要:

1. 网络隔离
   - OT 网络与 IT 网络物理隔离或防火墙隔离
   - 只允许授权的 SCADA/HMI 访问 Modbus TCP 端口 502
   - 使用工业防火墙(如 Tofino)进行深度包检测

2. 访问控制
   - 防火墙白名单:只允许特定 IP 访问 502 端口
   - 使用 VPN 进行远程访问
   - 禁止从互联网直接访问 Modbus 设备

3. 写操作保护
   - 对写功能码(05/06/0F/10)进行额外验证
   - 关键设备(如 PCS 控制)增加硬件联锁
   - 记录所有写操作审计日志

4. Modbus TCP over TLS(新标准)
   - 使用 Modbus Security(端口 802)
   - 基于 TLS 1.2+ 加密传输

褚成志的IoT笔记