Skip to content

MQTT 最佳实践

主题设计规范

层级结构原则

推荐结构:{组织}/{站点}/{设备类型}/{设备ID}/{数据类型}

示例(储能系统):
ess/site001/bms/rack01/cell/001/voltage    # 电芯电压
ess/site001/bms/rack01/soc                 # 电池组 SOC
ess/site001/pcs/unit01/ac/power            # PCS 交流功率
ess/site001/ems/dispatch/setpoint          # 调度设定值
ess/site001/alarm/bms/rack01/overvoltage   # 告警
ess/site001/cmd/pcs/unit01/start           # 控制指令

主题设计禁忌

❌ 错误示例:
/data                          # 太宽泛,无法过滤
device001temperature           # 无层级,不可扩展
ess/site001/bms/rack01/#/data  # 通配符不能在中间
$SYS/custom/topic              # $SYS 是系统保留前缀

✅ 正确示例:
ess/site001/bms/rack01/temperature
ess/+/bms/+/temperature        # 订阅所有站点所有 BMS 温度
ess/site001/#                  # 订阅 site001 所有数据

命令与响应模式

# 请求-响应模式(MQTT 5.0 Response Topic)
发布控制指令:
  Topic:    cmd/device001/set_power
  Payload:  {"value": 100, "unit": "kW"}
  Response-Topic: cmd/device001/set_power/response
  Correlation-Data: "req-uuid-12345"

设备响应:
  Topic:    cmd/device001/set_power/response
  Payload:  {"status": "ok", "actual_value": 100}
  Correlation-Data: "req-uuid-12345"  # 对应请求

连接管理

客户端 ID 规范

python
import uuid
import socket

def generate_client_id(device_type: str, device_id: str) -> str:
    """
    生成唯一且有意义的客户端 ID
    格式:{设备类型}-{设备ID}-{随机后缀}
    """
    suffix = uuid.uuid4().hex[:8]
    client_id = f"{device_type}-{device_id}-{suffix}"
    # MQTT 3.1.1 限制 23 字符,5.0 无限制
    return client_id[:23]

# 示例
client_id = generate_client_id("bms", "rack01")  # bms-rack01-a3f2b1c4

断线重连策略

python
import paho.mqtt.client as mqtt
import time
import logging

class RobustMQTTClient:
    def __init__(self, broker_host, broker_port=1883):
        self.broker_host = broker_host
        self.broker_port = broker_port
        self.reconnect_delay = 1
        self.max_reconnect_delay = 60
        self.client = mqtt.Client(
            client_id=generate_client_id("gateway", "001"),
            clean_session=False  # 持久会话,断线期间消息不丢失
        )
        self._setup_callbacks()

    def _setup_callbacks(self):
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect
        self.client.on_message = self._on_message

    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            logging.info("MQTT connected successfully")
            self.reconnect_delay = 1  # 重置重连延迟
            # 重新订阅(持久会话时 broker 会自动恢复,但显式订阅更安全)
            self._resubscribe()
        else:
            logging.error(f"MQTT connect failed, rc={rc}")

    def _on_disconnect(self, client, userdata, rc):
        if rc != 0:
            logging.warning(f"Unexpected disconnect, rc={rc}")
            # paho 会自动重连(loop_start 模式下)

    def connect(self):
        self.client.will_set(
            f"gateway/001/status",
            '{"online":false}',
            qos=1,
            retain=True
        )
        # 启用自动重连
        self.client.reconnect_delay_set(
            min_delay=1,
            max_delay=self.max_reconnect_delay
        )
        self.client.connect_async(self.broker_host, self.broker_port, keepalive=60)
        self.client.loop_start()

    def _resubscribe(self):
        topics = [
            ("cmd/gateway/001/#", 1),
            ("config/gateway/001", 1),
        ]
        self.client.subscribe(topics)

Keep Alive 配置

python
# Keep Alive 设置原则:
# - 网络稳定:60-120 秒
# - 移动网络:30-60 秒(NAT 超时通常 30 秒)
# - 卫星/极差网络:10-30 秒

client.connect(broker, port, keepalive=60)

# Broker 端配置(EMQX)
# emqx.conf:
# mqtt.keepalive_backoff = 0.75  # 允许 keepalive * 1.5 的超时容忍

消息设计

Payload 格式规范

python
import json
import time
from dataclasses import dataclass, asdict

@dataclass
class SensorData:
    device_id: str
    timestamp: int          # Unix 时间戳(毫秒)
    value: float
    unit: str
    quality: int            # 0=好, 1=可疑, 2=坏

def publish_sensor_data(client, device_id: str, value: float):
    data = SensorData(
        device_id=device_id,
        timestamp=int(time.time() * 1000),
        value=value,
        unit="kW",
        quality=0
    )
    payload = json.dumps(asdict(data), separators=(',', ':'))  # 紧凑格式
    client.publish(
        f"ess/site001/pcs/{device_id}/power",
        payload,
        qos=1
    )

批量数据上报

python
# 对于高频数据,批量打包减少连接开销
def publish_batch(client, device_id: str, readings: list):
    """
    批量上报,减少 MQTT 消息数量
    适用于 100ms 采样但 1s 上报的场景
    """
    batch = {
        "device_id": device_id,
        "timestamp": int(time.time() * 1000),
        "count": len(readings),
        "data": readings  # [{"ts": ..., "value": ...}, ...]
    }
    client.publish(
        f"ess/site001/bms/{device_id}/batch",
        json.dumps(batch),
        qos=1
    )

使用 MessagePack 压缩

python
import msgpack

# 对于大量数值数据,MessagePack 比 JSON 节省 30-50% 空间
def publish_compressed(client, topic: str, data: dict):
    payload = msgpack.packb(data, use_bin_type=True)
    client.publish(topic, payload, qos=1)

def on_message_compressed(client, userdata, msg):
    data = msgpack.unpackb(msg.payload, raw=False)
    process_data(data)

QoS 选择策略

python
# QoS 选择决策树

def choose_qos(message_type: str) -> int:
    qos_map = {
        # QoS 0:高频遥测,允许丢失
        "telemetry_high_freq": 0,    # 100ms 采样率的传感器数据
        "heartbeat": 0,              # 心跳包

        # QoS 1:重要状态,不允许丢失但可重复
        "status_change": 1,          # 设备状态变更
        "alarm": 1,                  # 告警信息
        "telemetry_low_freq": 1,     # 低频重要数据

        # QoS 2:关键控制,不允许重复执行
        "control_command": 2,        # 控制指令(启停、设定值)
        "billing_data": 2,           # 计费数据
    }
    return qos_map.get(message_type, 1)

持久会话与离线消息

python
# 持久会话配置(clean_session=False)
# 适用于:设备频繁离线,需要接收离线期间的消息

client = mqtt.Client(
    client_id="device-001",
    clean_session=False  # 持久会话
)

# 注意:持久会话 + QoS 1/2 才能保证离线消息
# QoS 0 消息即使持久会话也不会缓存

# Broker 端配置离线消息队列大小(EMQX)
# mqtt.max_mqueue_len = 1000
# mqtt.mqueue_store_qos0 = false

安全最佳实践

python
import ssl

def create_secure_client(ca_cert, client_cert, client_key):
    client = mqtt.Client(
        client_id="secure-device-001",
        clean_session=False
    )

    # TLS 配置
    client.tls_set(
        ca_certs=ca_cert,
        certfile=client_cert,
        keyfile=client_key,
        tls_version=ssl.PROTOCOL_TLS_CLIENT,
        ciphers="ECDHE+AESGCM:ECDHE+CHACHA20"  # 强密码套件
    )
    client.tls_insecure_set(False)  # 严格验证服务器证书

    # 用户名密码(双重认证)
    client.username_pw_set("device-001", "strong-password-here")

    return client

性能调优

python
# 高吞吐场景优化

# 1. 使用异步发布
client.loop_start()  # 后台线程处理网络 I/O

# 2. 批量发布时使用 inflight 窗口
# EMQX 配置:mqtt.max_inflight = 32(默认)

# 3. 避免在回调中做耗时操作
import queue
import threading

message_queue = queue.Queue(maxsize=10000)

def on_message(client, userdata, msg):
    # 快速入队,不阻塞 MQTT 线程
    try:
        message_queue.put_nowait(msg)
    except queue.Full:
        logging.warning("Message queue full, dropping message")

def process_worker():
    while True:
        msg = message_queue.get()
        # 在独立线程中处理消息
        process_message(msg)

# 启动处理线程
threading.Thread(target=process_worker, daemon=True).start()

监控指标

bash
# EMQX 关键监控指标(通过 HTTP API)
curl http://emqx:18083/api/v5/metrics

# 关注指标:
# messages.publish.rate      - 发布速率
# messages.deliver.rate      - 投递速率
# messages.dropped           - 丢弃消息数
# connections.count          - 当前连接数
# subscriptions.count        - 订阅数
# retained_messages.count    - 保留消息数

# Mosquitto 统计($SYS 主题)
mosquitto_sub -h localhost -t '$SYS/#' -v

褚成志的IoT笔记