储能数据采集与监控(SCADA)
SCADA 系统架构
储能 SCADA 系统组成:
数据采集层:
├── 实时数据库(Redis):毫秒级实时数据
├── 时序数据库(TimescaleDB/InfluxDB):历史数据
└── 关系数据库(PostgreSQL):配置、告警、报表
展示层:
├── Web HMI(Vue.js + ECharts)
├── 移动端 App
└── 大屏展示系统
告警系统:
├── 实时告警引擎
├── 告警推送(短信/邮件/企业微信)
└── 告警历史记录
报表系统:
├── 日/月/年报表
├── 充放电统计
└── 效率分析时序数据库选型
TimescaleDB(推荐)
sql
-- 创建储能数据表(TimescaleDB 超表)
CREATE TABLE ess_telemetry (
time TIMESTAMPTZ NOT NULL,
site_id VARCHAR(50) NOT NULL,
device_id VARCHAR(100) NOT NULL,
tag_name VARCHAR(100) NOT NULL,
value DOUBLE PRECISION,
quality SMALLINT DEFAULT 0
);
-- 转换为超表(按时间自动分区)
SELECT create_hypertable('ess_telemetry', 'time',
chunk_time_interval => INTERVAL '1 day');
-- 创建索引
CREATE INDEX ON ess_telemetry (site_id, device_id, tag_name, time DESC);
-- 启用压缩(7天后自动压缩)
ALTER TABLE ess_telemetry SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'site_id, device_id, tag_name'
);
SELECT add_compression_policy('ess_telemetry', INTERVAL '7 days');
-- 数据保留策略(保留 2 年)
SELECT add_retention_policy('ess_telemetry', INTERVAL '2 years');sql
-- 常用查询
-- 最新值
SELECT DISTINCT ON (device_id, tag_name)
device_id, tag_name, value, time
FROM ess_telemetry
WHERE site_id = 'site001'
ORDER BY device_id, tag_name, time DESC;
-- 1分钟均值(降采样)
SELECT
time_bucket('1 minute', time) AS bucket,
device_id,
tag_name,
AVG(value) AS avg_value,
MAX(value) AS max_value,
MIN(value) AS min_value
FROM ess_telemetry
WHERE site_id = 'site001'
AND time >= NOW() - INTERVAL '1 hour'
GROUP BY bucket, device_id, tag_name
ORDER BY bucket;
-- 连续聚合视图(预计算,提升查询性能)
CREATE MATERIALIZED VIEW ess_telemetry_1min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
site_id, device_id, tag_name,
AVG(value) AS avg_val,
MAX(value) AS max_val,
MIN(value) AS min_val,
COUNT(*) AS sample_count
FROM ess_telemetry
GROUP BY bucket, site_id, device_id, tag_name;
SELECT add_continuous_aggregate_policy('ess_telemetry_1min',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');告警引擎
python
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional, Callable
import time
class AlarmLevel(IntEnum):
INFO = 1
WARNING = 2
ALARM = 3
CRITICAL = 4
@dataclass
class AlarmRule:
rule_id: str
device_id: str
tag_name: str
condition: str # 'gt', 'lt', 'eq', 'ne'
threshold: float
level: AlarmLevel
message_template: str
delay_seconds: float = 0 # 延迟确认(避免抖动)
recovery_threshold: Optional[float] = None
class AlarmEngine:
def __init__(self, rules: list, notify_callback: Callable):
self.rules = {r.rule_id: r for r in rules}
self.notify = notify_callback
self.active_alarms = {} # {rule_id: alarm_start_time}
self.pending_alarms = {} # {rule_id: first_trigger_time}
def check(self, device_id: str, tag_name: str, value: float):
for rule in self.rules.values():
if rule.device_id != device_id or rule.tag_name != tag_name:
continue
triggered = self._evaluate(value, rule.condition, rule.threshold)
if triggered:
self._handle_trigger(rule, value)
else:
self._handle_recovery(rule, value)
def _evaluate(self, value: float, condition: str, threshold: float) -> bool:
ops = {
'gt': value > threshold,
'gte': value >= threshold,
'lt': value < threshold,
'lte': value <= threshold,
'eq': abs(value - threshold) < 1e-6,
}
return ops.get(condition, False)
def _handle_trigger(self, rule: AlarmRule, value: float):
now = time.time()
if rule.rule_id not in self.pending_alarms:
self.pending_alarms[rule.rule_id] = now
# 延迟确认:持续触发超过 delay_seconds 才告警
if (now - self.pending_alarms[rule.rule_id] >= rule.delay_seconds
and rule.rule_id not in self.active_alarms):
self.active_alarms[rule.rule_id] = now
alarm = {
'rule_id': rule.rule_id,
'device_id': rule.device_id,
'tag_name': rule.tag_name,
'value': value,
'threshold': rule.threshold,
'level': rule.level,
'message': rule.message_template.format(value=value),
'timestamp': now
}
self.notify(alarm)
def _handle_recovery(self, rule: AlarmRule, value: float):
self.pending_alarms.pop(rule.rule_id, None)
if rule.rule_id in self.active_alarms:
# 检查恢复阈值(防止在阈值附近反复告警)
if rule.recovery_threshold is not None:
if not self._evaluate(value, rule.condition, rule.recovery_threshold):
del self.active_alarms[rule.rule_id]
self.notify({
'rule_id': rule.rule_id,
'type': 'recovery',
'timestamp': time.time()
})
else:
del self.active_alarms[rule.rule_id]
# 配置示例
rules = [
AlarmRule(
rule_id="bms_rack01_overvoltage",
device_id="bms_rack01",
tag_name="max_cell_voltage",
condition="gt",
threshold=3.65,
level=AlarmLevel.CRITICAL,
message_template="电芯过压告警:{value:.3f}V",
delay_seconds=3,
recovery_threshold=3.60
),
AlarmRule(
rule_id="bms_rack01_high_temp",
device_id="bms_rack01",
tag_name="max_temperature",
condition="gt",
threshold=45.0,
level=AlarmLevel.WARNING,
message_template="电池温度偏高:{value:.1f}°C",
delay_seconds=10,
recovery_threshold=40.0
),
]Grafana 监控面板
json
// Grafana Dashboard 配置片段(储能系统概览)
{
"panels": [
{
"title": "系统 SOC",
"type": "gauge",
"datasource": "TimescaleDB",
"targets": [{
"rawSql": "SELECT time, value FROM ess_telemetry WHERE device_id='system' AND tag_name='soc' ORDER BY time DESC LIMIT 1",
"format": "time_series"
}],
"fieldConfig": {
"defaults": {
"min": 0, "max": 100,
"unit": "percent",
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "yellow", "value": 20},
{"color": "green", "value": 30}
]
}
}
}
},
{
"title": "充放电功率趋势",
"type": "timeseries",
"targets": [{
"rawSql": "SELECT time_bucket('1m', time) as time, AVG(value) FROM ess_telemetry WHERE tag_name='active_power' AND $__timeFilter(time) GROUP BY 1 ORDER BY 1"
}]
}
]
}报表生成
python
import pandas as pd
from datetime import datetime, timedelta
class ESSReportGenerator:
def __init__(self, db_connection):
self.db = db_connection
def generate_daily_report(self, site_id: str, date: datetime) -> dict:
"""生成日报表"""
start = date.replace(hour=0, minute=0, second=0)
end = start + timedelta(days=1)
# 查询充放电数据
df = pd.read_sql("""
SELECT
time_bucket('1 hour', time) as hour,
SUM(CASE WHEN value > 0 THEN value/3600 ELSE 0 END) as charge_kwh,
SUM(CASE WHEN value < 0 THEN ABS(value)/3600 ELSE 0 END) as discharge_kwh
FROM ess_telemetry
WHERE site_id = %s
AND tag_name = 'active_power'
AND time BETWEEN %s AND %s
GROUP BY hour
ORDER BY hour
""", self.db, params=[site_id, start, end])
total_charge = df['charge_kwh'].sum()
total_discharge = df['discharge_kwh'].sum()
efficiency = total_discharge / total_charge if total_charge > 0 else 0
return {
'date': date.strftime('%Y-%m-%d'),
'site_id': site_id,
'total_charge_kwh': round(total_charge, 2),
'total_discharge_kwh': round(total_discharge, 2),
'roundtrip_efficiency': round(efficiency * 100, 2),
'hourly_data': df.to_dict('records')
}