Skip to content

MQTT 协议原理与生态

协议简介

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,专为低带宽、高延迟或不可靠网络环境设计。由 IBM 的 Andy Stanford-Clark 和 Arlen Nipper 于 1999 年发明,2014 年成为 OASIS 标准。

当前版本:
  MQTT 3.1.1  → 最广泛使用,2014 年 OASIS 标准
  MQTT 5.0    → 2019 年发布,增加大量企业特性
  MQTT-SN     → 面向传感器网络(UDP/ZigBee)

核心架构

发布者 (Publisher)          Broker(消息代理)         订阅者 (Subscriber)
                                                      
  设备 A ──PUBLISH──►  ┌─────────────────┐  ──PUSH──► 应用服务器
  设备 B ──PUBLISH──►  │   MQTT Broker   │  ──PUSH──► 移动 App
  传感器 ──PUBLISH──►  │  (EMQ X/Mosquitto│  ──PUSH──► 数据库写入器
                       └─────────────────┘

                         SUBSCRIBE

                         订阅者注册感兴趣的主题

核心概念

主题(Topic)

主题是 MQTT 消息路由的基础,使用 / 分隔的层级字符串:

工业 IoT 主题设计规范:

factory/line1/machine001/temperature    # 具体设备数据
factory/line1/machine001/status         # 设备状态
factory/+/machine001/temperature        # 单层通配符(+)
factory/#                               # 多层通配符(#)
$SYS/broker/clients/connected           # 系统主题(只读)

储能系统主题示例:
ess/site001/bms/001/soc                 # 电池 SOC
ess/site001/bms/001/voltage             # 电池电压
ess/site001/pcs/001/power               # PCS 功率
ess/site001/ems/setpoint                # EMS 设定值
ess/site001/alert/+                     # 所有告警

QoS 服务质量

QoS 级别名称保证适用场景
QoS 0At most once最多一次,可能丢失高频传感器数据、允许丢失
QoS 1At least once至少一次,可能重复告警、状态变更
QoS 2Exactly once恰好一次,无重复无丢失计费数据、控制指令
QoS 1 消息流程:
Publisher → PUBLISH(id=1) → Broker → PUBLISH(id=1) → Subscriber
Publisher ← PUBACK(id=1) ← Broker ← PUBACK(id=1) ← Subscriber

QoS 2 消息流程(四次握手):
Publisher → PUBLISH(id=1)  → Broker
Publisher ← PUBREC(id=1)   ← Broker
Publisher → PUBREL(id=1)   → Broker
Publisher ← PUBCOMP(id=1)  ← Broker

保留消息(Retained Message)

python
# 发布保留消息:新订阅者立即收到最新值
client.publish("device/001/status", "online", retain=True)

# 清除保留消息
client.publish("device/001/status", "", retain=True)

遗嘱消息(Last Will and Testament)

python
import paho.mqtt.client as mqtt

client = mqtt.Client(client_id="device-001")

# 设置遗嘱:连接异常断开时自动发布
client.will_set(
    topic="device/001/status",
    payload='{"status":"offline","reason":"unexpected"}',
    qos=1,
    retain=True
)
client.connect("broker.example.com", 1883)

MQTT 5.0 新特性

原因码(Reason Codes)

MQTT 5.0 为所有 ACK 报文添加了详细原因码,便于调试:

0x00 Success
0x04 Disconnect with Will Message
0x80 Unspecified error
0x83 Implementation specific error
0x87 Not authorized
0x90 Topic Name invalid
0x97 Quota exceeded
0x9E Shared Subscriptions not supported

用户属性(User Properties)

python
# MQTT 5.0 用户属性(类似 HTTP Header)
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

props = Properties(PacketTypes.PUBLISH)
props.UserProperty = [
    ("device-type", "bms"),
    ("firmware-version", "2.1.0"),
    ("site-id", "site001")
]
client.publish("ess/data", payload, properties=props)

共享订阅(Shared Subscriptions)

# 多个消费者共享订阅,实现负载均衡
$share/group1/factory/+/data

# 三个消费者实例订阅同一主题,消息轮询分发
Consumer-1: SUBSCRIBE $share/workers/sensor/+/data
Consumer-2: SUBSCRIBE $share/workers/sensor/+/data
Consumer-3: SUBSCRIBE $share/workers/sensor/+/data

消息过期(Message Expiry Interval)

python
props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 300  # 5 分钟后过期

client.publish("sensor/temperature", "25.6", properties=props)

主流 Broker 对比

Broker语言特点适用规模
EMQXErlang高并发,集群,企业功能完整百万级连接
MosquittoC轻量,资源占用极低嵌入式/小规模
HiveMQJava企业级,插件丰富企业级
VerneMQErlang分布式,高可用中大规模
NanoMQC超轻量,边缘计算嵌入式边缘

EMQX 集群部署

yaml
# docker-compose.yml - EMQX 三节点集群
version: '3'
services:
  emqx1:
    image: emqx/emqx:5.3.0
    environment:
      - EMQX_NODE_NAME=emqx@node1.emqx.io
      - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
      - EMQX_CLUSTER__STATIC__SEEDS=emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io
    ports:
      - "1883:1883"
      - "8083:8083"
      - "18083:18083"

  emqx2:
    image: emqx/emqx:5.3.0
    environment:
      - EMQX_NODE_NAME=emqx@node2.emqx.io
      - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
      - EMQX_CLUSTER__STATIC__SEEDS=emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io

  emqx3:
    image: emqx/emqx:5.3.0
    environment:
      - EMQX_NODE_NAME=emqx@node3.emqx.io
      - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
      - EMQX_CLUSTER__STATIC__SEEDS=emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io

安全机制

TLS/SSL 加密

bash
# 生成 CA 和服务器证书
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt

openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -days 3650 -in server.csr -CA ca.crt -CAkey ca.key \
  -CAcreateserial -out server.crt
python
# Python 客户端 TLS 连接
import ssl
import paho.mqtt.client as mqtt

client = mqtt.Client()
client.tls_set(
    ca_certs="ca.crt",
    certfile="client.crt",
    keyfile="client.key",
    tls_version=ssl.PROTOCOL_TLS
)
client.connect("broker.example.com", 8883)

认证与授权

EMQX 认证方式:
├── 用户名/密码(内置数据库、MySQL、Redis)
├── JWT Token
├── X.509 客户端证书
└── HTTP 外部认证

ACL 规则示例(EMQX):
{allow, {user, "device-001"}, publish, ["device/001/#"]}.
{allow, {user, "device-001"}, subscribe, ["device/001/cmd"]}.
{deny, all}.

协议报文结构

MQTT 固定头(2字节起):
┌─────────────────────────────────────────┐
│ Byte 1: 报文类型(4bit) + 标志位(4bit)   │
│ Byte 2+: 剩余长度(变长编码)            │
└─────────────────────────────────────────┘

CONNECT 报文关键字段:
- Protocol Name: "MQTT"
- Protocol Level: 4(3.1.1) / 5(5.0)
- Connect Flags: Clean Session, Will Flag, QoS, Retain, Password, Username
- Keep Alive: 心跳间隔(秒)
- Client Identifier: 客户端唯一标识

生态工具

开发工具:
├── MQTTX          → 跨平台 GUI 客户端(推荐)
├── mosquitto_pub/sub → 命令行工具
├── MQTT Explorer  → 主题树可视化
└── mqtt-spy       → 调试分析

客户端库:
├── Python: paho-mqtt
├── Java:   Eclipse Paho / HiveMQ Client
├── Go:     eclipse/paho.mqtt.golang
├── C/C++:  Eclipse Paho C
├── JS:     MQTT.js
└── Rust:   rumqttc

集成框架:
├── Node-RED       → 可视化流程编排
├── Home Assistant → 智能家居
├── ThingsBoard    → IoT 平台
└── EMQX Rule Engine → 数据处理规则引擎

褚成志的IoT笔记