储能系统架构详解
硬件架构
电池系统层级
电池系统层级结构:
系统(System):100MWh
└── 电池阵列(Array):10MWh × 10
└── 电池簇(Cluster/String):500kWh × 20
└── 电池包(Pack):50kWh × 10
└── 电池模组(Module):5kWh × 10
└── 电芯(Cell):3.2V × 100Ah × 16串
典型磷酸铁锂电芯参数:
├── 标称电压:3.2V
├── 充电截止:3.65V
├── 放电截止:2.5V
├── 容量:100-280Ah(不同规格)
└── 循环寿命:3000-6000次(80% DOD)电气拓扑
直流侧拓扑(集中式):
电池簇1 ──[簇级 BMS]──┐
电池簇2 ──[簇级 BMS]──┤
电池簇3 ──[簇级 BMS]──┤── 直流母线 ──[PCS]── 交流母线 ──[变压器]── 电网
... │
电池簇N ──[簇级 BMS]──┘
直流侧拓扑(分散式):
电池簇1 ──[PCS1]──┐
电池簇2 ──[PCS2]──┤── 交流母线 ──[变压器]── 电网
电池簇3 ──[PCS3]──┘
优缺点对比:
集中式:成本低,但单点故障影响大
分散式:可靠性高,灵活扩展,成本较高软件架构
EMS 软件架构
EMS 软件分层架构:
┌─────────────────────────────────────────────────────┐
│ 展示层 │
│ Web HMI │ 移动 App │ 大屏展示 │ 报表系统 │
├─────────────────────────────────────────────────────┤
│ 应用层 │
│ 调度执行 │ 策略引擎 │ 告警管理 │ 用户管理 │
├─────────────────────────────────────────────────────┤
│ 服务层 │
│ 数据服务 │ 规则引擎 │ 计算服务 │ 通知服务 │
├─────────────────────────────────────────────────────┤
│ 数据层 │
│ 时序数据库 │ 关系数据库 │ 消息队列 │ 缓存 │
├─────────────────────────────────────────────────────┤
│ 采集层 │
│ Modbus 驱动 │ IEC 104 │ CAN 驱动 │ 协议适配器 │
└─────────────────────────────────────────────────────┘数据流架构
python
# 储能 EMS 数据流设计
"""
数据流向:
设备 → 采集驱动 → 消息队列 → 数据处理 → 存储/展示
1. 采集层(1s 周期)
Modbus/CAN 驱动 → 原始数据 → Redis 实时缓存
2. 处理层(实时)
Redis → 规则引擎(告警判断)→ 告警队列
Redis → 计算引擎(SOC 估算)→ 计算结果
3. 存储层
原始数据 → TimescaleDB(时序数据库)
告警事件 → PostgreSQL(关系数据库)
实时状态 → Redis(缓存)
4. 上报层
TimescaleDB → MQTT → 云平台
PostgreSQL → IEC 104 → 调度中心
"""
import asyncio
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class DataPoint:
device_id: str
tag_name: str
value: float
quality: int
timestamp: float
class DataPipeline:
def __init__(self, redis_client, timescale_client, mqtt_client):
self.redis = redis_client
self.timescale = timescale_client
self.mqtt = mqtt_client
self.alarm_rules = AlarmRuleEngine()
async def process(self, data_point: DataPoint):
# 1. 更新实时缓存
await self.redis.hset(
f"realtime:{data_point.device_id}",
data_point.tag_name,
data_point.value
)
# 2. 告警检查
alarms = self.alarm_rules.check(data_point)
for alarm in alarms:
await self.handle_alarm(alarm)
# 3. 写入时序数据库
await self.timescale.insert(data_point)
# 4. 上报云平台(降频,每 10s 一次)
if self._should_report(data_point):
await self.mqtt.publish(
f"ess/{data_point.device_id}/telemetry",
data_point
)高可用架构
双机热备
EMS 高可用架构:
┌─────────────────┐
│ 负载均衡/VIP │
└────────┬────────┘
│
┌──────────────┴──────────────┐
│ │
┌─────────▼─────────┐ ┌──────────▼──────────┐
│ EMS 主节点 │ │ EMS 备节点 │
│ (Active) │◄─────►│ (Standby) │
│ 192.168.1.10 │ 心跳 │ 192.168.1.11 │
└─────────┬─────────┘ └──────────┬──────────┘
│ │
└──────────┬──────────────────┘
│
┌──────────▼──────────┐
│ 共享存储/数据库 │
│ TimescaleDB HA │
└─────────────────────┘
切换条件:
- 主节点心跳超时(30s)
- 主节点服务不可用
- 手动切换(维护)
切换时间:< 30s(业务中断时间)边缘侧离线缓存
python
class OfflineBuffer:
"""
网络中断时本地缓存数据,恢复后补传
适用于:EMS → 云平台 的 MQTT 连接
"""
def __init__(self, db_path: str, max_size_mb: int = 500):
self.db_path = db_path
self.max_size = max_size_mb * 1024 * 1024
self._init_db()
def _init_db(self):
import sqlite3
self.conn = sqlite3.connect(self.db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS buffer (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
payload BLOB NOT NULL,
timestamp REAL NOT NULL,
sent INTEGER DEFAULT 0
)
""")
self.conn.execute("CREATE INDEX IF NOT EXISTS idx_sent ON buffer(sent)")
self.conn.commit()
def store(self, topic: str, payload: bytes):
"""存储待发送消息"""
# 检查存储空间
size = os.path.getsize(self.db_path)
if size > self.max_size:
self._evict_oldest()
self.conn.execute(
"INSERT INTO buffer (topic, payload, timestamp) VALUES (?, ?, ?)",
(topic, payload, time.time())
)
self.conn.commit()
def get_pending(self, limit: int = 100):
"""获取待发送消息"""
cursor = self.conn.execute(
"SELECT id, topic, payload FROM buffer WHERE sent=0 "
"ORDER BY timestamp LIMIT ?",
(limit,)
)
return cursor.fetchall()
def mark_sent(self, ids: list):
"""标记已发送"""
placeholders = ','.join('?' * len(ids))
self.conn.execute(
f"UPDATE buffer SET sent=1 WHERE id IN ({placeholders})",
ids
)
self.conn.commit()
def _evict_oldest(self):
"""清理最旧的已发送消息"""
self.conn.execute(
"DELETE FROM buffer WHERE sent=1 AND "
"id IN (SELECT id FROM buffer WHERE sent=1 ORDER BY timestamp LIMIT 1000)"
)
self.conn.commit()安全架构
储能电站网络安全分区:
互联网
│
│ HTTPS/VPN
│
┌───▼────────────────────────────────────────────────┐
│ 安全区 I(管理信息区) │
│ 云平台接入 │ 远程运维 │ 数据上报 │
├────────────────────────────────────────────────────┤
│ 安全隔离装置(防火墙/正向隔离) │
├────────────────────────────────────────────────────┤
│ 安全区 II(生产控制区) │
│ EMS │ SCADA │ 历史数据库 │
├────────────────────────────────────────────────────┤
│ 安全隔离装置(纵向加密认证) │
├────────────────────────────────────────────────────┤
│ 安全区 III(实时控制区) │
│ PCS 控制 │ BMS 管理 │ 保护装置 │
└────────────────────────────────────────────────────┘
符合《电力监控系统安全防护规定》(发改委 14 号令)