BMS 通信集成
BMS 通信接口
CAN 总线接口
大多数国内 BMS 厂商使用 CAN 2.0B 协议,波特率通常为 250kbps 或 500kbps。
python
import can
import struct
import time
class BMSCANReader:
"""通过 CAN 总线读取 BMS 数据"""
# 典型 BMS CAN ID 定义(各厂商不同,需查手册)
CAN_ID_BASIC_INFO = 0x18FF50E5 # 基本信息(SOC、电压、电流)
CAN_ID_CELL_VOLTAGE = 0x18FF51E5 # 电芯电压
CAN_ID_TEMPERATURE = 0x18FF52E5 # 温度
CAN_ID_ALARM = 0x18FF53E5 # 告警信息
def __init__(self, channel: str = 'can0', bitrate: int = 250000):
self.bus = can.interface.Bus(
channel=channel,
bustype='socketcan',
bitrate=bitrate
)
self.data = {}
def start_reading(self):
"""启动 CAN 消息接收循环"""
for msg in self.bus:
self._process_message(msg)
def _process_message(self, msg: can.Message):
can_id = msg.arbitration_id
data = msg.data
if can_id == self.CAN_ID_BASIC_INFO:
self._parse_basic_info(data)
elif can_id == self.CAN_ID_CELL_VOLTAGE:
self._parse_cell_voltage(data)
elif can_id == self.CAN_ID_TEMPERATURE:
self._parse_temperature(data)
elif can_id == self.CAN_ID_ALARM:
self._parse_alarm(data)
def _parse_basic_info(self, data: bytes):
"""解析基本信息帧"""
# 字节0-1:总电压(0.1V 精度)
voltage_raw = struct.unpack('>H', data[0:2])[0]
self.data['total_voltage'] = voltage_raw * 0.1
# 字节2-3:总电流(0.1A 精度,有符号,正=充电)
current_raw = struct.unpack('>h', data[2:4])[0]
self.data['current'] = current_raw * 0.1
# 字节4:SOC(1% 精度)
self.data['soc'] = data[4]
# 字节5:SOH(1% 精度)
self.data['soh'] = data[5]
# 字节6-7:剩余容量(0.1Ah 精度)
remaining_raw = struct.unpack('>H', data[6:8])[0]
self.data['remaining_capacity'] = remaining_raw * 0.1
def _parse_cell_voltage(self, data: bytes):
"""解析电芯电压帧(每帧 4 个电芯)"""
frame_index = data[0] # 帧序号,从 0 开始
base_cell = frame_index * 4
for i in range(4):
cell_voltage_raw = struct.unpack('>H', data[1+i*2:3+i*2])[0]
cell_id = base_cell + i + 1
self.data[f'cell_{cell_id:03d}_voltage'] = cell_voltage_raw * 0.001 # mV → VModbus RTU 接口
python
from pymodbus.client import ModbusSerialClient
import struct
class BMSModbusReader:
"""通过 Modbus RTU 读取 BMS 数据"""
# 寄存器地址映射(以某国产 BMS 为例)
REG_MAP = {
'total_voltage': (0x0000, 1, 0.1), # 地址, 寄存器数, 系数
'current': (0x0001, 1, 0.1),
'soc': (0x0002, 1, 0.1),
'soh': (0x0003, 1, 0.1),
'max_cell_voltage': (0x0010, 1, 0.001),
'min_cell_voltage': (0x0011, 1, 0.001),
'max_temperature': (0x0020, 1, 0.1),
'min_temperature': (0x0021, 1, 0.1),
'alarm_status': (0x0100, 2, 1), # 32位告警状态
}
def __init__(self, port: str, unit_id: int = 1):
self.client = ModbusSerialClient(
port=port,
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1,
timeout=1
)
self.unit_id = unit_id
self.client.connect()
def read_all(self) -> dict:
"""读取所有 BMS 数据"""
result = {}
# 批量读取基本数据(0x0000-0x0030)
response = self.client.read_holding_registers(
address=0x0000, count=0x30, slave=self.unit_id
)
if response.isError():
return {}
regs = response.registers
result['total_voltage'] = regs[0x0000] * 0.1
result['current'] = self._to_signed(regs[0x0001]) * 0.1
result['soc'] = regs[0x0002] * 0.1
result['soh'] = regs[0x0003] * 0.1
result['max_cell_voltage'] = regs[0x0010] * 0.001
result['min_cell_voltage'] = regs[0x0011] * 0.001
result['max_temperature'] = self._to_signed(regs[0x0020]) * 0.1
result['min_temperature'] = self._to_signed(regs[0x0021]) * 0.1
# 读取告警状态
alarm_response = self.client.read_holding_registers(
address=0x0100, count=2, slave=self.unit_id
)
if not alarm_response.isError():
alarm_word = (alarm_response.registers[0] << 16) | alarm_response.registers[1]
result['alarms'] = self._parse_alarms(alarm_word)
return result
def _to_signed(self, value: int) -> int:
"""16位无符号转有符号"""
return value if value < 32768 else value - 65536
def _parse_alarms(self, alarm_word: int) -> list:
"""解析告警位"""
alarm_bits = {
0: "过压告警",
1: "欠压告警",
2: "过温告警",
3: "低温告警",
4: "过流充电告警",
5: "过流放电告警",
6: "SOC 低告警",
7: "绝缘故障",
8: "通信故障",
16: "过压保护",
17: "欠压保护",
18: "过温保护",
19: "过流保护",
}
active_alarms = []
for bit, name in alarm_bits.items():
if alarm_word & (1 << bit):
active_alarms.append(name)
return active_alarmsSOC 估算算法
python
import numpy as np
class KalmanSOCEstimator:
"""
扩展卡尔曼滤波 SOC 估算
状态量:[SOC, 极化电压]
观测量:端电压
"""
def __init__(self, capacity_ah: float, initial_soc: float = 0.5):
self.capacity = capacity_ah * 3600 # 转换为 As
# 状态向量 [SOC, V_polarization]
self.x = np.array([[initial_soc], [0.0]])
# 状态协方差矩阵
self.P = np.diag([0.01, 0.001])
# 过程噪声协方差
self.Q = np.diag([1e-6, 1e-5])
# 观测噪声协方差(电压测量噪声)
self.R = np.array([[1e-4]])
# 电池内阻(Ω)
self.R0 = 0.002
# 极化时间常数(s)
self.tau = 30.0
# 极化内阻(Ω)
self.R1 = 0.001
def ocv_from_soc(self, soc: float) -> float:
"""OCV-SOC 曲线(磷酸铁锂,需根据实际电池标定)"""
# 简化的分段线性模型
ocv_table = [
(0.0, 2.50), (0.1, 3.00), (0.2, 3.20),
(0.3, 3.25), (0.5, 3.30), (0.7, 3.35),
(0.8, 3.40), (0.9, 3.50), (1.0, 3.65)
]
soc = max(0.0, min(1.0, soc))
for i in range(len(ocv_table) - 1):
s1, v1 = ocv_table[i]
s2, v2 = ocv_table[i + 1]
if s1 <= soc <= s2:
return v1 + (v2 - v1) * (soc - s1) / (s2 - s1)
return ocv_table[-1][1]
def update(self, current: float, voltage: float, dt: float) -> float:
"""
更新 SOC 估算
current: 电流(A,充电为正)
voltage: 端电压(V)
dt: 时间步长(s)
"""
soc, v_pol = self.x[0, 0], self.x[1, 0]
# 状态转移矩阵
A = np.array([
[1.0, 0.0],
[0.0, np.exp(-dt / self.tau)]
])
# 输入矩阵
B = np.array([
[-dt / self.capacity],
[self.R1 * (1 - np.exp(-dt / self.tau))]
])
# 预测步骤
x_pred = A @ self.x + B * current
P_pred = A @ self.P @ A.T + self.Q
# 观测矩阵(线性化)
soc_pred = x_pred[0, 0]
docv_dsoc = (self.ocv_from_soc(soc_pred + 0.001) -
self.ocv_from_soc(soc_pred - 0.001)) / 0.002
H = np.array([[docv_dsoc, 1.0]])
# 预测电压
v_pred = (self.ocv_from_soc(soc_pred) +
x_pred[1, 0] +
current * self.R0)
# 卡尔曼增益
S = H @ P_pred @ H.T + self.R
K = P_pred @ H.T @ np.linalg.inv(S)
# 更新步骤
innovation = voltage - v_pred
self.x = x_pred + K * innovation
self.P = (np.eye(2) - K @ H) @ P_pred
# 限制 SOC 范围
self.x[0, 0] = max(0.0, min(1.0, self.x[0, 0]))
return self.x[0, 0]多簇均衡管理
python
class MultiClusterBalancer:
"""
多电池簇均衡管理
策略:SOC 差异超过阈值时,通过调整各簇充放电功率实现均衡
"""
SOC_BALANCE_THRESHOLD = 0.05 # SOC 差异超过 5% 触发均衡
def __init__(self, num_clusters: int, total_power_kw: float):
self.num_clusters = num_clusters
self.total_power = total_power_kw
def calculate_power_distribution(self, soc_list: list,
total_power: float) -> list:
"""
计算各簇功率分配
soc_list: 各簇 SOC 列表
total_power: 总功率(正=充电,负=放电)
返回:各簇功率分配列表
"""
n = len(soc_list)
avg_soc = sum(soc_list) / n
max_soc = max(soc_list)
min_soc = min(soc_list)
# SOC 差异小,均匀分配
if max_soc - min_soc < self.SOC_BALANCE_THRESHOLD:
return [total_power / n] * n
# SOC 差异大,按 SOC 偏差加权分配
if total_power > 0: # 充电:SOC 低的多充
weights = [avg_soc - soc + 1.0 for soc in soc_list]
else: # 放电:SOC 高的多放
weights = [soc - avg_soc + 1.0 for soc in soc_list]
total_weight = sum(weights)
powers = [total_power * w / total_weight for w in weights]
return powers