Skip to content

BMS 通信集成

BMS 通信接口

CAN 总线接口

大多数国内 BMS 厂商使用 CAN 2.0B 协议,波特率通常为 250kbps 或 500kbps。

python
import can
import struct
import time

class BMSCANReader:
    """通过 CAN 总线读取 BMS 数据"""

    # 典型 BMS CAN ID 定义(各厂商不同,需查手册)
    CAN_ID_BASIC_INFO = 0x18FF50E5      # 基本信息(SOC、电压、电流)
    CAN_ID_CELL_VOLTAGE = 0x18FF51E5    # 电芯电压
    CAN_ID_TEMPERATURE = 0x18FF52E5    # 温度
    CAN_ID_ALARM = 0x18FF53E5          # 告警信息

    def __init__(self, channel: str = 'can0', bitrate: int = 250000):
        self.bus = can.interface.Bus(
            channel=channel,
            bustype='socketcan',
            bitrate=bitrate
        )
        self.data = {}

    def start_reading(self):
        """启动 CAN 消息接收循环"""
        for msg in self.bus:
            self._process_message(msg)

    def _process_message(self, msg: can.Message):
        can_id = msg.arbitration_id
        data = msg.data

        if can_id == self.CAN_ID_BASIC_INFO:
            self._parse_basic_info(data)
        elif can_id == self.CAN_ID_CELL_VOLTAGE:
            self._parse_cell_voltage(data)
        elif can_id == self.CAN_ID_TEMPERATURE:
            self._parse_temperature(data)
        elif can_id == self.CAN_ID_ALARM:
            self._parse_alarm(data)

    def _parse_basic_info(self, data: bytes):
        """解析基本信息帧"""
        # 字节0-1:总电压(0.1V 精度)
        voltage_raw = struct.unpack('>H', data[0:2])[0]
        self.data['total_voltage'] = voltage_raw * 0.1

        # 字节2-3:总电流(0.1A 精度,有符号,正=充电)
        current_raw = struct.unpack('>h', data[2:4])[0]
        self.data['current'] = current_raw * 0.1

        # 字节4:SOC(1% 精度)
        self.data['soc'] = data[4]

        # 字节5:SOH(1% 精度)
        self.data['soh'] = data[5]

        # 字节6-7:剩余容量(0.1Ah 精度)
        remaining_raw = struct.unpack('>H', data[6:8])[0]
        self.data['remaining_capacity'] = remaining_raw * 0.1

    def _parse_cell_voltage(self, data: bytes):
        """解析电芯电压帧(每帧 4 个电芯)"""
        frame_index = data[0]  # 帧序号,从 0 开始
        base_cell = frame_index * 4

        for i in range(4):
            cell_voltage_raw = struct.unpack('>H', data[1+i*2:3+i*2])[0]
            cell_id = base_cell + i + 1
            self.data[f'cell_{cell_id:03d}_voltage'] = cell_voltage_raw * 0.001  # mV → V

Modbus RTU 接口

python
from pymodbus.client import ModbusSerialClient
import struct

class BMSModbusReader:
    """通过 Modbus RTU 读取 BMS 数据"""

    # 寄存器地址映射(以某国产 BMS 为例)
    REG_MAP = {
        'total_voltage':    (0x0000, 1, 0.1),   # 地址, 寄存器数, 系数
        'current':          (0x0001, 1, 0.1),
        'soc':              (0x0002, 1, 0.1),
        'soh':              (0x0003, 1, 0.1),
        'max_cell_voltage': (0x0010, 1, 0.001),
        'min_cell_voltage': (0x0011, 1, 0.001),
        'max_temperature':  (0x0020, 1, 0.1),
        'min_temperature':  (0x0021, 1, 0.1),
        'alarm_status':     (0x0100, 2, 1),     # 32位告警状态
    }

    def __init__(self, port: str, unit_id: int = 1):
        self.client = ModbusSerialClient(
            port=port,
            baudrate=9600,
            bytesize=8,
            parity='N',
            stopbits=1,
            timeout=1
        )
        self.unit_id = unit_id
        self.client.connect()

    def read_all(self) -> dict:
        """读取所有 BMS 数据"""
        result = {}

        # 批量读取基本数据(0x0000-0x0030)
        response = self.client.read_holding_registers(
            address=0x0000, count=0x30, slave=self.unit_id
        )
        if response.isError():
            return {}

        regs = response.registers

        result['total_voltage'] = regs[0x0000] * 0.1
        result['current'] = self._to_signed(regs[0x0001]) * 0.1
        result['soc'] = regs[0x0002] * 0.1
        result['soh'] = regs[0x0003] * 0.1
        result['max_cell_voltage'] = regs[0x0010] * 0.001
        result['min_cell_voltage'] = regs[0x0011] * 0.001
        result['max_temperature'] = self._to_signed(regs[0x0020]) * 0.1
        result['min_temperature'] = self._to_signed(regs[0x0021]) * 0.1

        # 读取告警状态
        alarm_response = self.client.read_holding_registers(
            address=0x0100, count=2, slave=self.unit_id
        )
        if not alarm_response.isError():
            alarm_word = (alarm_response.registers[0] << 16) | alarm_response.registers[1]
            result['alarms'] = self._parse_alarms(alarm_word)

        return result

    def _to_signed(self, value: int) -> int:
        """16位无符号转有符号"""
        return value if value < 32768 else value - 65536

    def _parse_alarms(self, alarm_word: int) -> list:
        """解析告警位"""
        alarm_bits = {
            0: "过压告警",
            1: "欠压告警",
            2: "过温告警",
            3: "低温告警",
            4: "过流充电告警",
            5: "过流放电告警",
            6: "SOC 低告警",
            7: "绝缘故障",
            8: "通信故障",
            16: "过压保护",
            17: "欠压保护",
            18: "过温保护",
            19: "过流保护",
        }
        active_alarms = []
        for bit, name in alarm_bits.items():
            if alarm_word & (1 << bit):
                active_alarms.append(name)
        return active_alarms

SOC 估算算法

python
import numpy as np

class KalmanSOCEstimator:
    """
    扩展卡尔曼滤波 SOC 估算
    状态量:[SOC, 极化电压]
    观测量:端电压
    """

    def __init__(self, capacity_ah: float, initial_soc: float = 0.5):
        self.capacity = capacity_ah * 3600  # 转换为 As

        # 状态向量 [SOC, V_polarization]
        self.x = np.array([[initial_soc], [0.0]])

        # 状态协方差矩阵
        self.P = np.diag([0.01, 0.001])

        # 过程噪声协方差
        self.Q = np.diag([1e-6, 1e-5])

        # 观测噪声协方差(电压测量噪声)
        self.R = np.array([[1e-4]])

        # 电池内阻(Ω)
        self.R0 = 0.002

        # 极化时间常数(s)
        self.tau = 30.0

        # 极化内阻(Ω)
        self.R1 = 0.001

    def ocv_from_soc(self, soc: float) -> float:
        """OCV-SOC 曲线(磷酸铁锂,需根据实际电池标定)"""
        # 简化的分段线性模型
        ocv_table = [
            (0.0, 2.50), (0.1, 3.00), (0.2, 3.20),
            (0.3, 3.25), (0.5, 3.30), (0.7, 3.35),
            (0.8, 3.40), (0.9, 3.50), (1.0, 3.65)
        ]
        soc = max(0.0, min(1.0, soc))
        for i in range(len(ocv_table) - 1):
            s1, v1 = ocv_table[i]
            s2, v2 = ocv_table[i + 1]
            if s1 <= soc <= s2:
                return v1 + (v2 - v1) * (soc - s1) / (s2 - s1)
        return ocv_table[-1][1]

    def update(self, current: float, voltage: float, dt: float) -> float:
        """
        更新 SOC 估算
        current: 电流(A,充电为正)
        voltage: 端电压(V)
        dt: 时间步长(s)
        """
        soc, v_pol = self.x[0, 0], self.x[1, 0]

        # 状态转移矩阵
        A = np.array([
            [1.0, 0.0],
            [0.0, np.exp(-dt / self.tau)]
        ])

        # 输入矩阵
        B = np.array([
            [-dt / self.capacity],
            [self.R1 * (1 - np.exp(-dt / self.tau))]
        ])

        # 预测步骤
        x_pred = A @ self.x + B * current
        P_pred = A @ self.P @ A.T + self.Q

        # 观测矩阵(线性化)
        soc_pred = x_pred[0, 0]
        docv_dsoc = (self.ocv_from_soc(soc_pred + 0.001) -
                     self.ocv_from_soc(soc_pred - 0.001)) / 0.002
        H = np.array([[docv_dsoc, 1.0]])

        # 预测电压
        v_pred = (self.ocv_from_soc(soc_pred) +
                  x_pred[1, 0] +
                  current * self.R0)

        # 卡尔曼增益
        S = H @ P_pred @ H.T + self.R
        K = P_pred @ H.T @ np.linalg.inv(S)

        # 更新步骤
        innovation = voltage - v_pred
        self.x = x_pred + K * innovation
        self.P = (np.eye(2) - K @ H) @ P_pred

        # 限制 SOC 范围
        self.x[0, 0] = max(0.0, min(1.0, self.x[0, 0]))

        return self.x[0, 0]

多簇均衡管理

python
class MultiClusterBalancer:
    """
    多电池簇均衡管理
    策略:SOC 差异超过阈值时,通过调整各簇充放电功率实现均衡
    """

    SOC_BALANCE_THRESHOLD = 0.05  # SOC 差异超过 5% 触发均衡

    def __init__(self, num_clusters: int, total_power_kw: float):
        self.num_clusters = num_clusters
        self.total_power = total_power_kw

    def calculate_power_distribution(self, soc_list: list,
                                      total_power: float) -> list:
        """
        计算各簇功率分配
        soc_list: 各簇 SOC 列表
        total_power: 总功率(正=充电,负=放电)
        返回:各簇功率分配列表
        """
        n = len(soc_list)
        avg_soc = sum(soc_list) / n
        max_soc = max(soc_list)
        min_soc = min(soc_list)

        # SOC 差异小,均匀分配
        if max_soc - min_soc < self.SOC_BALANCE_THRESHOLD:
            return [total_power / n] * n

        # SOC 差异大,按 SOC 偏差加权分配
        if total_power > 0:  # 充电:SOC 低的多充
            weights = [avg_soc - soc + 1.0 for soc in soc_list]
        else:  # 放电:SOC 高的多放
            weights = [soc - avg_soc + 1.0 for soc in soc_list]

        total_weight = sum(weights)
        powers = [total_power * w / total_weight for w in weights]

        return powers

褚成志的IoT笔记