IEC 104 最佳实践
连接参数配置
标准参数设置
python
# IEC 104 关键参数(IEC 60870-5-104 标准定义)
IEC104_PARAMS = {
# 发送窗口大小(未确认的 I 帧最大数量)
'k': 12, # 标准值 12,范围 1-32767
# 接收窗口大小(收到多少 I 帧后发送 S 帧确认)
'w': 8, # 标准值 8,范围 1-32767,建议 k*2/3
# 连接超时(TCP 连接建立超时)
't0': 30, # 秒,范围 1-255
# 发送超时(I 帧发出后等待确认的超时)
't1': 15, # 秒,范围 1-255
# 无数据确认超时(收到 I 帧后发送 S 帧的最大等待时间)
't2': 10, # 秒,范围 1-255,必须 < t1
# 测试帧超时(TESTFR 发出后等待响应的超时)
't3': 20, # 秒,范围 1-172800(48小时)
}
# 储能电站推荐配置(调度专线,延迟低)
ESS_PARAMS = {
'k': 12, 'w': 8,
't0': 30, 't1': 15, 't2': 10, 't3': 20
}
# 4G/互联网连接推荐配置(延迟高,需要更宽松的超时)
MOBILE_PARAMS = {
'k': 12, 'w': 8,
't0': 60, 't1': 30, 't2': 20, 't3': 60
}数据上送策略
变化上送 vs 周期上送
python
class DataUploadStrategy:
"""
IEC 104 数据上送策略:
- 遥信:变化时立即上送(COT=3 突发)
- 重要遥测:变化超过死区时上送(COT=3)
- 所有遥测:周期性上送(COT=1)
"""
def __init__(self, connection):
self.connection = connection
self.last_values = {}
self.deadbands = {
'power': 1.0, # 功率死区:1kW
'voltage': 0.5, # 电压死区:0.5V
'current': 0.1, # 电流死区:0.1A
'soc': 0.5, # SOC 死区:0.5%
'temperature': 0.5, # 温度死区:0.5°C
}
def check_and_send(self, tag_name: str, ioa: int,
value: float, tag_type: str):
last = self.last_values.get(tag_name)
deadband = self.deadbands.get(tag_type, 0.1)
if last is None or abs(value - last) >= deadband:
self.send_measurement(ioa, value, cot=CauseOfTransmission.SPONTANEOUS)
self.last_values[tag_name] = value
def send_periodic(self, measurements: dict):
"""周期性上送所有遥测(COT=1)"""
for ioa, value in measurements.items():
self.send_measurement(ioa, value, cot=CauseOfTransmission.PERIODIC)
def send_measurement(self, ioa: int, value: float, cot):
asdu = ASDU.create(
TypeID.M_ME_TF_1, # 带时标的短浮点
False, cot, 0, self.ca, False, False
)
# 添加 CP56Time2a 时标
timestamp = CP56Time2a.createFromMsTimestamp(
int(time.time() * 1000)
)
io = MeasuredValueShortWithCP56Time2a.create(ioa, value, 0, timestamp)
asdu.addInformationObject(io)
self.connection.sendASDU(asdu)总召唤响应
python
def handle_general_interrogation(self, asdu):
"""响应总召唤:上送所有当前值"""
ca = asdu.getCA()
# 1. 发送激活确认
self.send_activation_confirm(TypeID.C_IC_NA_1, ca)
# 2. 批量上送所有遥信(COT=20 响应站召唤)
self.send_all_single_points(ca, cot=CauseOfTransmission.INTERROGATED_BY_STATION)
# 3. 批量上送所有遥测
self.send_all_measurements(ca, cot=CauseOfTransmission.INTERROGATED_BY_STATION)
# 4. 发送召唤结束
self.send_interrogation_end(ca)
def send_all_measurements(self, ca: int, cot, batch_size: int = 50):
"""分批上送遥测,避免单个 ASDU 过大"""
measurements = self.get_all_measurements()
items = list(measurements.items())
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
asdu = ASDU.create(TypeID.M_ME_NC_1, True, cot, 0, ca, False, False)
for ioa, value in batch:
io = MeasuredValueShort.create(ioa, value, 0)
asdu.addInformationObject(io)
self.connection.sendASDU(asdu)遥控安全机制
python
class SecureRemoteControl:
"""
IEC 104 遥控安全实现:
1. 选择-执行双步操作
2. 超时自动撤销
3. 操作日志记录
4. 权限验证
"""
SELECT_TIMEOUT = 30 # 选择后 30 秒内必须执行,否则自动撤销
def __init__(self):
self.pending_selects = {} # {ioa: (value, select_time)}
def handle_command(self, asdu):
type_id = asdu.getTypeID()
cot = asdu.getCOT()
io = asdu.getElement(0)
ioa = io.getObjectAddress()
if cot == CauseOfTransmission.ACTIVATION:
if self._is_select(io):
self._handle_select(asdu, ioa, io)
else:
self._handle_execute(asdu, ioa, io)
elif cot == CauseOfTransmission.DEACTIVATION:
self._handle_cancel(asdu, ioa)
def _handle_select(self, asdu, ioa, io):
"""处理选择命令"""
value = io.getState()
# 权限检查
if not self._check_permission(ioa, 'select'):
self._send_negative_confirm(asdu, ioa)
return
# 记录选择状态
self.pending_selects[ioa] = (value, time.time())
# 发送选择确认
self._send_activation_confirm(asdu, ioa, value)
# 启动超时定时器
threading.Timer(
self.SELECT_TIMEOUT,
self._auto_cancel,
args=[ioa]
).start()
self._log_operation('SELECT', ioa, value)
def _handle_execute(self, asdu, ioa, io):
"""处理执行命令"""
if ioa not in self.pending_selects:
# 未经选择直接执行,拒绝
self._send_negative_confirm(asdu, ioa)
return
selected_value, select_time = self.pending_selects[ioa]
execute_value = io.getState()
# 验证执行值与选择值一致
if execute_value != selected_value:
self._send_negative_confirm(asdu, ioa)
return
# 执行控制操作
success = self._execute_control(ioa, execute_value)
if success:
del self.pending_selects[ioa]
self._send_activation_confirm(asdu, ioa, execute_value)
self._send_activation_termination(asdu, ioa, execute_value)
self._log_operation('EXECUTE', ioa, execute_value)
else:
self._send_negative_confirm(asdu, ioa)
def _auto_cancel(self, ioa):
"""超时自动撤销"""
if ioa in self.pending_selects:
del self.pending_selects[ioa]
self._log_operation('AUTO_CANCEL', ioa, None)时钟同步
python
def handle_clock_sync(self, asdu):
"""处理时钟同步命令"""
io = asdu.getElement(0)
master_time = io.getTimestamp()
# 计算时间偏差
local_time_ms = int(time.time() * 1000)
master_time_ms = master_time.toMsTimestamp()
offset_ms = master_time_ms - local_time_ms
if abs(offset_ms) > 1000: # 偏差超过 1 秒才同步
# 同步系统时间(需要 root 权限)
import subprocess
new_time = datetime.fromtimestamp(master_time_ms / 1000)
subprocess.run(['date', '-s', new_time.strftime('%Y-%m-%d %H:%M:%S')])
logging.info(f"Clock synchronized, offset was {offset_ms}ms")
# 发送时钟同步确认
self._send_clock_sync_confirm(asdu)
def send_clock_sync_request(self):
"""主站发送时钟同步"""
asdu = ASDU.create(TypeID.C_CS_NA_1, False,
CauseOfTransmission.ACTIVATION, 0, self.ca, False, False)
timestamp = CP56Time2a.createFromMsTimestamp(int(time.time() * 1000))
io = ClockSynchronizationCommand.create(0, timestamp)
asdu.addInformationObject(io)
self.connection.sendASDU(asdu)冗余连接
python
class RedundantIEC104Client:
"""
IEC 104 冗余连接:
主用/备用双通道,主用故障时自动切换
"""
def __init__(self, primary_host, backup_host, port=2404):
self.primary = IEC104Client(primary_host, port)
self.backup = IEC104Client(backup_host, port)
self.active = self.primary
self.is_primary_active = True
def connect(self):
# 优先连接主用通道
try:
self.primary.connect()
self.active = self.primary
logging.info("Connected via primary channel")
except Exception as e:
logging.warning(f"Primary failed: {e}, trying backup")
self.backup.connect()
self.active = self.backup
self.is_primary_active = False
def _monitor_connection(self):
"""监控连接状态,主用恢复时切回"""
while True:
time.sleep(30)
if not self.is_primary_active:
try:
self.primary.connect()
# 主用恢复,切换回主用
old_backup = self.active
self.active = self.primary
self.is_primary_active = True
old_backup.disconnect()
logging.info("Switched back to primary channel")
except Exception:
pass # 主用仍不可用,继续使用备用合规性要求
电力行业 IEC 104 合规要点:
1. 安全防护
- 必须部署在电力调度数据网(专用网络)
- 禁止与互联网直接互联
- 需通过电力专用纵向加密认证装置
- 符合《电力监控系统安全防护规定》(国家发改委 14 号令)
2. 时间同步
- 子站时钟精度要求:≤ 1ms(GPS/北斗对时)
- 所有带时标的数据必须使用同步后的时间
3. 数据质量
- 遥测数据需包含质量描述符(QDS)
- 无效数据必须标记 Invalid 位
- 通信中断时数据标记为 Non-topical
4. 测试认证
- 需通过电力行业标准测试(IEC 60870-5-104 一致性测试)
- 与调度系统联调测试