Skip to content

储能数据采集与监控(SCADA)

SCADA 系统架构

储能 SCADA 系统组成:

数据采集层:
├── 实时数据库(Redis):毫秒级实时数据
├── 时序数据库(TimescaleDB/InfluxDB):历史数据
└── 关系数据库(PostgreSQL):配置、告警、报表

展示层:
├── Web HMI(Vue.js + ECharts)
├── 移动端 App
└── 大屏展示系统

告警系统:
├── 实时告警引擎
├── 告警推送(短信/邮件/企业微信)
└── 告警历史记录

报表系统:
├── 日/月/年报表
├── 充放电统计
└── 效率分析

时序数据库选型

TimescaleDB(推荐)

sql
-- 创建储能数据表(TimescaleDB 超表)
CREATE TABLE ess_telemetry (
    time        TIMESTAMPTZ NOT NULL,
    site_id     VARCHAR(50) NOT NULL,
    device_id   VARCHAR(100) NOT NULL,
    tag_name    VARCHAR(100) NOT NULL,
    value       DOUBLE PRECISION,
    quality     SMALLINT DEFAULT 0
);

-- 转换为超表(按时间自动分区)
SELECT create_hypertable('ess_telemetry', 'time',
    chunk_time_interval => INTERVAL '1 day');

-- 创建索引
CREATE INDEX ON ess_telemetry (site_id, device_id, tag_name, time DESC);

-- 启用压缩(7天后自动压缩)
ALTER TABLE ess_telemetry SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'site_id, device_id, tag_name'
);
SELECT add_compression_policy('ess_telemetry', INTERVAL '7 days');

-- 数据保留策略(保留 2 年)
SELECT add_retention_policy('ess_telemetry', INTERVAL '2 years');
sql
-- 常用查询

-- 最新值
SELECT DISTINCT ON (device_id, tag_name)
    device_id, tag_name, value, time
FROM ess_telemetry
WHERE site_id = 'site001'
ORDER BY device_id, tag_name, time DESC;

-- 1分钟均值(降采样)
SELECT
    time_bucket('1 minute', time) AS bucket,
    device_id,
    tag_name,
    AVG(value) AS avg_value,
    MAX(value) AS max_value,
    MIN(value) AS min_value
FROM ess_telemetry
WHERE site_id = 'site001'
  AND time >= NOW() - INTERVAL '1 hour'
GROUP BY bucket, device_id, tag_name
ORDER BY bucket;

-- 连续聚合视图(预计算,提升查询性能)
CREATE MATERIALIZED VIEW ess_telemetry_1min
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', time) AS bucket,
    site_id, device_id, tag_name,
    AVG(value) AS avg_val,
    MAX(value) AS max_val,
    MIN(value) AS min_val,
    COUNT(*) AS sample_count
FROM ess_telemetry
GROUP BY bucket, site_id, device_id, tag_name;

SELECT add_continuous_aggregate_policy('ess_telemetry_1min',
    start_offset => INTERVAL '1 hour',
    end_offset => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

告警引擎

python
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional, Callable
import time

class AlarmLevel(IntEnum):
    INFO = 1
    WARNING = 2
    ALARM = 3
    CRITICAL = 4

@dataclass
class AlarmRule:
    rule_id: str
    device_id: str
    tag_name: str
    condition: str          # 'gt', 'lt', 'eq', 'ne'
    threshold: float
    level: AlarmLevel
    message_template: str
    delay_seconds: float = 0    # 延迟确认(避免抖动)
    recovery_threshold: Optional[float] = None

class AlarmEngine:
    def __init__(self, rules: list, notify_callback: Callable):
        self.rules = {r.rule_id: r for r in rules}
        self.notify = notify_callback
        self.active_alarms = {}     # {rule_id: alarm_start_time}
        self.pending_alarms = {}    # {rule_id: first_trigger_time}

    def check(self, device_id: str, tag_name: str, value: float):
        for rule in self.rules.values():
            if rule.device_id != device_id or rule.tag_name != tag_name:
                continue

            triggered = self._evaluate(value, rule.condition, rule.threshold)

            if triggered:
                self._handle_trigger(rule, value)
            else:
                self._handle_recovery(rule, value)

    def _evaluate(self, value: float, condition: str, threshold: float) -> bool:
        ops = {
            'gt': value > threshold,
            'gte': value >= threshold,
            'lt': value < threshold,
            'lte': value <= threshold,
            'eq': abs(value - threshold) < 1e-6,
        }
        return ops.get(condition, False)

    def _handle_trigger(self, rule: AlarmRule, value: float):
        now = time.time()

        if rule.rule_id not in self.pending_alarms:
            self.pending_alarms[rule.rule_id] = now

        # 延迟确认:持续触发超过 delay_seconds 才告警
        if (now - self.pending_alarms[rule.rule_id] >= rule.delay_seconds
                and rule.rule_id not in self.active_alarms):
            self.active_alarms[rule.rule_id] = now
            alarm = {
                'rule_id': rule.rule_id,
                'device_id': rule.device_id,
                'tag_name': rule.tag_name,
                'value': value,
                'threshold': rule.threshold,
                'level': rule.level,
                'message': rule.message_template.format(value=value),
                'timestamp': now
            }
            self.notify(alarm)

    def _handle_recovery(self, rule: AlarmRule, value: float):
        self.pending_alarms.pop(rule.rule_id, None)

        if rule.rule_id in self.active_alarms:
            # 检查恢复阈值(防止在阈值附近反复告警)
            if rule.recovery_threshold is not None:
                if not self._evaluate(value, rule.condition, rule.recovery_threshold):
                    del self.active_alarms[rule.rule_id]
                    self.notify({
                        'rule_id': rule.rule_id,
                        'type': 'recovery',
                        'timestamp': time.time()
                    })
            else:
                del self.active_alarms[rule.rule_id]

# 配置示例
rules = [
    AlarmRule(
        rule_id="bms_rack01_overvoltage",
        device_id="bms_rack01",
        tag_name="max_cell_voltage",
        condition="gt",
        threshold=3.65,
        level=AlarmLevel.CRITICAL,
        message_template="电芯过压告警:{value:.3f}V",
        delay_seconds=3,
        recovery_threshold=3.60
    ),
    AlarmRule(
        rule_id="bms_rack01_high_temp",
        device_id="bms_rack01",
        tag_name="max_temperature",
        condition="gt",
        threshold=45.0,
        level=AlarmLevel.WARNING,
        message_template="电池温度偏高:{value:.1f}°C",
        delay_seconds=10,
        recovery_threshold=40.0
    ),
]

Grafana 监控面板

json
// Grafana Dashboard 配置片段(储能系统概览)
{
  "panels": [
    {
      "title": "系统 SOC",
      "type": "gauge",
      "datasource": "TimescaleDB",
      "targets": [{
        "rawSql": "SELECT time, value FROM ess_telemetry WHERE device_id='system' AND tag_name='soc' ORDER BY time DESC LIMIT 1",
        "format": "time_series"
      }],
      "fieldConfig": {
        "defaults": {
          "min": 0, "max": 100,
          "unit": "percent",
          "thresholds": {
            "steps": [
              {"color": "red", "value": 0},
              {"color": "yellow", "value": 20},
              {"color": "green", "value": 30}
            ]
          }
        }
      }
    },
    {
      "title": "充放电功率趋势",
      "type": "timeseries",
      "targets": [{
        "rawSql": "SELECT time_bucket('1m', time) as time, AVG(value) FROM ess_telemetry WHERE tag_name='active_power' AND $__timeFilter(time) GROUP BY 1 ORDER BY 1"
      }]
    }
  ]
}

报表生成

python
import pandas as pd
from datetime import datetime, timedelta

class ESSReportGenerator:
    def __init__(self, db_connection):
        self.db = db_connection

    def generate_daily_report(self, site_id: str, date: datetime) -> dict:
        """生成日报表"""
        start = date.replace(hour=0, minute=0, second=0)
        end = start + timedelta(days=1)

        # 查询充放电数据
        df = pd.read_sql("""
            SELECT
                time_bucket('1 hour', time) as hour,
                SUM(CASE WHEN value > 0 THEN value/3600 ELSE 0 END) as charge_kwh,
                SUM(CASE WHEN value < 0 THEN ABS(value)/3600 ELSE 0 END) as discharge_kwh
            FROM ess_telemetry
            WHERE site_id = %s
              AND tag_name = 'active_power'
              AND time BETWEEN %s AND %s
            GROUP BY hour
            ORDER BY hour
        """, self.db, params=[site_id, start, end])

        total_charge = df['charge_kwh'].sum()
        total_discharge = df['discharge_kwh'].sum()
        efficiency = total_discharge / total_charge if total_charge > 0 else 0

        return {
            'date': date.strftime('%Y-%m-%d'),
            'site_id': site_id,
            'total_charge_kwh': round(total_charge, 2),
            'total_discharge_kwh': round(total_discharge, 2),
            'roundtrip_efficiency': round(efficiency * 100, 2),
            'hourly_data': df.to_dict('records')
        }

褚成志的IoT笔记