Skip to content

IEC 104 最佳实践

连接参数配置

标准参数设置

python
# IEC 104 关键参数(IEC 60870-5-104 标准定义)

IEC104_PARAMS = {
    # 发送窗口大小(未确认的 I 帧最大数量)
    'k': 12,          # 标准值 12,范围 1-32767

    # 接收窗口大小(收到多少 I 帧后发送 S 帧确认)
    'w': 8,           # 标准值 8,范围 1-32767,建议 k*2/3

    # 连接超时(TCP 连接建立超时)
    't0': 30,         # 秒,范围 1-255

    # 发送超时(I 帧发出后等待确认的超时)
    't1': 15,         # 秒,范围 1-255

    # 无数据确认超时(收到 I 帧后发送 S 帧的最大等待时间)
    't2': 10,         # 秒,范围 1-255,必须 < t1

    # 测试帧超时(TESTFR 发出后等待响应的超时)
    't3': 20,         # 秒,范围 1-172800(48小时)
}

# 储能电站推荐配置(调度专线,延迟低)
ESS_PARAMS = {
    'k': 12, 'w': 8,
    't0': 30, 't1': 15, 't2': 10, 't3': 20
}

# 4G/互联网连接推荐配置(延迟高,需要更宽松的超时)
MOBILE_PARAMS = {
    'k': 12, 'w': 8,
    't0': 60, 't1': 30, 't2': 20, 't3': 60
}

数据上送策略

变化上送 vs 周期上送

python
class DataUploadStrategy:
    """
    IEC 104 数据上送策略:
    - 遥信:变化时立即上送(COT=3 突发)
    - 重要遥测:变化超过死区时上送(COT=3)
    - 所有遥测:周期性上送(COT=1)
    """

    def __init__(self, connection):
        self.connection = connection
        self.last_values = {}
        self.deadbands = {
            'power': 1.0,        # 功率死区:1kW
            'voltage': 0.5,      # 电压死区:0.5V
            'current': 0.1,      # 电流死区:0.1A
            'soc': 0.5,          # SOC 死区:0.5%
            'temperature': 0.5,  # 温度死区:0.5°C
        }

    def check_and_send(self, tag_name: str, ioa: int,
                       value: float, tag_type: str):
        last = self.last_values.get(tag_name)
        deadband = self.deadbands.get(tag_type, 0.1)

        if last is None or abs(value - last) >= deadband:
            self.send_measurement(ioa, value, cot=CauseOfTransmission.SPONTANEOUS)
            self.last_values[tag_name] = value

    def send_periodic(self, measurements: dict):
        """周期性上送所有遥测(COT=1)"""
        for ioa, value in measurements.items():
            self.send_measurement(ioa, value, cot=CauseOfTransmission.PERIODIC)

    def send_measurement(self, ioa: int, value: float, cot):
        asdu = ASDU.create(
            TypeID.M_ME_TF_1,  # 带时标的短浮点
            False, cot, 0, self.ca, False, False
        )
        # 添加 CP56Time2a 时标
        timestamp = CP56Time2a.createFromMsTimestamp(
            int(time.time() * 1000)
        )
        io = MeasuredValueShortWithCP56Time2a.create(ioa, value, 0, timestamp)
        asdu.addInformationObject(io)
        self.connection.sendASDU(asdu)

总召唤响应

python
def handle_general_interrogation(self, asdu):
    """响应总召唤:上送所有当前值"""
    ca = asdu.getCA()

    # 1. 发送激活确认
    self.send_activation_confirm(TypeID.C_IC_NA_1, ca)

    # 2. 批量上送所有遥信(COT=20 响应站召唤)
    self.send_all_single_points(ca, cot=CauseOfTransmission.INTERROGATED_BY_STATION)

    # 3. 批量上送所有遥测
    self.send_all_measurements(ca, cot=CauseOfTransmission.INTERROGATED_BY_STATION)

    # 4. 发送召唤结束
    self.send_interrogation_end(ca)

def send_all_measurements(self, ca: int, cot, batch_size: int = 50):
    """分批上送遥测,避免单个 ASDU 过大"""
    measurements = self.get_all_measurements()
    items = list(measurements.items())

    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        asdu = ASDU.create(TypeID.M_ME_NC_1, True, cot, 0, ca, False, False)
        for ioa, value in batch:
            io = MeasuredValueShort.create(ioa, value, 0)
            asdu.addInformationObject(io)
        self.connection.sendASDU(asdu)

遥控安全机制

python
class SecureRemoteControl:
    """
    IEC 104 遥控安全实现:
    1. 选择-执行双步操作
    2. 超时自动撤销
    3. 操作日志记录
    4. 权限验证
    """

    SELECT_TIMEOUT = 30  # 选择后 30 秒内必须执行,否则自动撤销

    def __init__(self):
        self.pending_selects = {}  # {ioa: (value, select_time)}

    def handle_command(self, asdu):
        type_id = asdu.getTypeID()
        cot = asdu.getCOT()
        io = asdu.getElement(0)
        ioa = io.getObjectAddress()

        if cot == CauseOfTransmission.ACTIVATION:
            if self._is_select(io):
                self._handle_select(asdu, ioa, io)
            else:
                self._handle_execute(asdu, ioa, io)

        elif cot == CauseOfTransmission.DEACTIVATION:
            self._handle_cancel(asdu, ioa)

    def _handle_select(self, asdu, ioa, io):
        """处理选择命令"""
        value = io.getState()

        # 权限检查
        if not self._check_permission(ioa, 'select'):
            self._send_negative_confirm(asdu, ioa)
            return

        # 记录选择状态
        self.pending_selects[ioa] = (value, time.time())

        # 发送选择确认
        self._send_activation_confirm(asdu, ioa, value)

        # 启动超时定时器
        threading.Timer(
            self.SELECT_TIMEOUT,
            self._auto_cancel,
            args=[ioa]
        ).start()

        self._log_operation('SELECT', ioa, value)

    def _handle_execute(self, asdu, ioa, io):
        """处理执行命令"""
        if ioa not in self.pending_selects:
            # 未经选择直接执行,拒绝
            self._send_negative_confirm(asdu, ioa)
            return

        selected_value, select_time = self.pending_selects[ioa]
        execute_value = io.getState()

        # 验证执行值与选择值一致
        if execute_value != selected_value:
            self._send_negative_confirm(asdu, ioa)
            return

        # 执行控制操作
        success = self._execute_control(ioa, execute_value)

        if success:
            del self.pending_selects[ioa]
            self._send_activation_confirm(asdu, ioa, execute_value)
            self._send_activation_termination(asdu, ioa, execute_value)
            self._log_operation('EXECUTE', ioa, execute_value)
        else:
            self._send_negative_confirm(asdu, ioa)

    def _auto_cancel(self, ioa):
        """超时自动撤销"""
        if ioa in self.pending_selects:
            del self.pending_selects[ioa]
            self._log_operation('AUTO_CANCEL', ioa, None)

时钟同步

python
def handle_clock_sync(self, asdu):
    """处理时钟同步命令"""
    io = asdu.getElement(0)
    master_time = io.getTimestamp()

    # 计算时间偏差
    local_time_ms = int(time.time() * 1000)
    master_time_ms = master_time.toMsTimestamp()
    offset_ms = master_time_ms - local_time_ms

    if abs(offset_ms) > 1000:  # 偏差超过 1 秒才同步
        # 同步系统时间(需要 root 权限)
        import subprocess
        new_time = datetime.fromtimestamp(master_time_ms / 1000)
        subprocess.run(['date', '-s', new_time.strftime('%Y-%m-%d %H:%M:%S')])
        logging.info(f"Clock synchronized, offset was {offset_ms}ms")

    # 发送时钟同步确认
    self._send_clock_sync_confirm(asdu)

def send_clock_sync_request(self):
    """主站发送时钟同步"""
    asdu = ASDU.create(TypeID.C_CS_NA_1, False,
                       CauseOfTransmission.ACTIVATION, 0, self.ca, False, False)
    timestamp = CP56Time2a.createFromMsTimestamp(int(time.time() * 1000))
    io = ClockSynchronizationCommand.create(0, timestamp)
    asdu.addInformationObject(io)
    self.connection.sendASDU(asdu)

冗余连接

python
class RedundantIEC104Client:
    """
    IEC 104 冗余连接:
    主用/备用双通道,主用故障时自动切换
    """

    def __init__(self, primary_host, backup_host, port=2404):
        self.primary = IEC104Client(primary_host, port)
        self.backup = IEC104Client(backup_host, port)
        self.active = self.primary
        self.is_primary_active = True

    def connect(self):
        # 优先连接主用通道
        try:
            self.primary.connect()
            self.active = self.primary
            logging.info("Connected via primary channel")
        except Exception as e:
            logging.warning(f"Primary failed: {e}, trying backup")
            self.backup.connect()
            self.active = self.backup
            self.is_primary_active = False

    def _monitor_connection(self):
        """监控连接状态,主用恢复时切回"""
        while True:
            time.sleep(30)
            if not self.is_primary_active:
                try:
                    self.primary.connect()
                    # 主用恢复,切换回主用
                    old_backup = self.active
                    self.active = self.primary
                    self.is_primary_active = True
                    old_backup.disconnect()
                    logging.info("Switched back to primary channel")
                except Exception:
                    pass  # 主用仍不可用,继续使用备用

合规性要求

电力行业 IEC 104 合规要点:

1. 安全防护
   - 必须部署在电力调度数据网(专用网络)
   - 禁止与互联网直接互联
   - 需通过电力专用纵向加密认证装置
   - 符合《电力监控系统安全防护规定》(国家发改委 14 号令)

2. 时间同步
   - 子站时钟精度要求:≤ 1ms(GPS/北斗对时)
   - 所有带时标的数据必须使用同步后的时间

3. 数据质量
   - 遥测数据需包含质量描述符(QDS)
   - 无效数据必须标记 Invalid 位
   - 通信中断时数据标记为 Non-topical

4. 测试认证
   - 需通过电力行业标准测试(IEC 60870-5-104 一致性测试)
   - 与调度系统联调测试

褚成志的IoT笔记