MQTT 协议原理与生态
协议简介
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,专为低带宽、高延迟或不可靠网络环境设计。由 IBM 的 Andy Stanford-Clark 和 Arlen Nipper 于 1999 年发明,2014 年成为 OASIS 标准。
当前版本:
MQTT 3.1.1 → 最广泛使用,2014 年 OASIS 标准
MQTT 5.0 → 2019 年发布,增加大量企业特性
MQTT-SN → 面向传感器网络(UDP/ZigBee)核心架构
发布者 (Publisher) Broker(消息代理) 订阅者 (Subscriber)
设备 A ──PUBLISH──► ┌─────────────────┐ ──PUSH──► 应用服务器
设备 B ──PUBLISH──► │ MQTT Broker │ ──PUSH──► 移动 App
传感器 ──PUBLISH──► │ (EMQ X/Mosquitto│ ──PUSH──► 数据库写入器
└─────────────────┘
▲
SUBSCRIBE
│
订阅者注册感兴趣的主题核心概念
主题(Topic)
主题是 MQTT 消息路由的基础,使用 / 分隔的层级字符串:
工业 IoT 主题设计规范:
factory/line1/machine001/temperature # 具体设备数据
factory/line1/machine001/status # 设备状态
factory/+/machine001/temperature # 单层通配符(+)
factory/# # 多层通配符(#)
$SYS/broker/clients/connected # 系统主题(只读)
储能系统主题示例:
ess/site001/bms/001/soc # 电池 SOC
ess/site001/bms/001/voltage # 电池电压
ess/site001/pcs/001/power # PCS 功率
ess/site001/ems/setpoint # EMS 设定值
ess/site001/alert/+ # 所有告警QoS 服务质量
| QoS 级别 | 名称 | 保证 | 适用场景 |
|---|---|---|---|
| QoS 0 | At most once | 最多一次,可能丢失 | 高频传感器数据、允许丢失 |
| QoS 1 | At least once | 至少一次,可能重复 | 告警、状态变更 |
| QoS 2 | Exactly once | 恰好一次,无重复无丢失 | 计费数据、控制指令 |
QoS 1 消息流程:
Publisher → PUBLISH(id=1) → Broker → PUBLISH(id=1) → Subscriber
Publisher ← PUBACK(id=1) ← Broker ← PUBACK(id=1) ← Subscriber
QoS 2 消息流程(四次握手):
Publisher → PUBLISH(id=1) → Broker
Publisher ← PUBREC(id=1) ← Broker
Publisher → PUBREL(id=1) → Broker
Publisher ← PUBCOMP(id=1) ← Broker保留消息(Retained Message)
python
# 发布保留消息:新订阅者立即收到最新值
client.publish("device/001/status", "online", retain=True)
# 清除保留消息
client.publish("device/001/status", "", retain=True)遗嘱消息(Last Will and Testament)
python
import paho.mqtt.client as mqtt
client = mqtt.Client(client_id="device-001")
# 设置遗嘱:连接异常断开时自动发布
client.will_set(
topic="device/001/status",
payload='{"status":"offline","reason":"unexpected"}',
qos=1,
retain=True
)
client.connect("broker.example.com", 1883)MQTT 5.0 新特性
原因码(Reason Codes)
MQTT 5.0 为所有 ACK 报文添加了详细原因码,便于调试:
0x00 Success
0x04 Disconnect with Will Message
0x80 Unspecified error
0x83 Implementation specific error
0x87 Not authorized
0x90 Topic Name invalid
0x97 Quota exceeded
0x9E Shared Subscriptions not supported用户属性(User Properties)
python
# MQTT 5.0 用户属性(类似 HTTP Header)
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
props = Properties(PacketTypes.PUBLISH)
props.UserProperty = [
("device-type", "bms"),
("firmware-version", "2.1.0"),
("site-id", "site001")
]
client.publish("ess/data", payload, properties=props)共享订阅(Shared Subscriptions)
# 多个消费者共享订阅,实现负载均衡
$share/group1/factory/+/data
# 三个消费者实例订阅同一主题,消息轮询分发
Consumer-1: SUBSCRIBE $share/workers/sensor/+/data
Consumer-2: SUBSCRIBE $share/workers/sensor/+/data
Consumer-3: SUBSCRIBE $share/workers/sensor/+/data消息过期(Message Expiry Interval)
python
props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 300 # 5 分钟后过期
client.publish("sensor/temperature", "25.6", properties=props)主流 Broker 对比
| Broker | 语言 | 特点 | 适用规模 |
|---|---|---|---|
| EMQX | Erlang | 高并发,集群,企业功能完整 | 百万级连接 |
| Mosquitto | C | 轻量,资源占用极低 | 嵌入式/小规模 |
| HiveMQ | Java | 企业级,插件丰富 | 企业级 |
| VerneMQ | Erlang | 分布式,高可用 | 中大规模 |
| NanoMQ | C | 超轻量,边缘计算 | 嵌入式边缘 |
EMQX 集群部署
yaml
# docker-compose.yml - EMQX 三节点集群
version: '3'
services:
emqx1:
image: emqx/emqx:5.3.0
environment:
- EMQX_NODE_NAME=emqx@node1.emqx.io
- EMQX_CLUSTER__DISCOVERY_STRATEGY=static
- EMQX_CLUSTER__STATIC__SEEDS=emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io
ports:
- "1883:1883"
- "8083:8083"
- "18083:18083"
emqx2:
image: emqx/emqx:5.3.0
environment:
- EMQX_NODE_NAME=emqx@node2.emqx.io
- EMQX_CLUSTER__DISCOVERY_STRATEGY=static
- EMQX_CLUSTER__STATIC__SEEDS=emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io
emqx3:
image: emqx/emqx:5.3.0
environment:
- EMQX_NODE_NAME=emqx@node3.emqx.io
- EMQX_CLUSTER__DISCOVERY_STRATEGY=static
- EMQX_CLUSTER__STATIC__SEEDS=emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io安全机制
TLS/SSL 加密
bash
# 生成 CA 和服务器证书
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -days 3650 -in server.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out server.crtpython
# Python 客户端 TLS 连接
import ssl
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.tls_set(
ca_certs="ca.crt",
certfile="client.crt",
keyfile="client.key",
tls_version=ssl.PROTOCOL_TLS
)
client.connect("broker.example.com", 8883)认证与授权
EMQX 认证方式:
├── 用户名/密码(内置数据库、MySQL、Redis)
├── JWT Token
├── X.509 客户端证书
└── HTTP 外部认证
ACL 规则示例(EMQX):
{allow, {user, "device-001"}, publish, ["device/001/#"]}.
{allow, {user, "device-001"}, subscribe, ["device/001/cmd"]}.
{deny, all}.协议报文结构
MQTT 固定头(2字节起):
┌─────────────────────────────────────────┐
│ Byte 1: 报文类型(4bit) + 标志位(4bit) │
│ Byte 2+: 剩余长度(变长编码) │
└─────────────────────────────────────────┘
CONNECT 报文关键字段:
- Protocol Name: "MQTT"
- Protocol Level: 4(3.1.1) / 5(5.0)
- Connect Flags: Clean Session, Will Flag, QoS, Retain, Password, Username
- Keep Alive: 心跳间隔(秒)
- Client Identifier: 客户端唯一标识生态工具
开发工具:
├── MQTTX → 跨平台 GUI 客户端(推荐)
├── mosquitto_pub/sub → 命令行工具
├── MQTT Explorer → 主题树可视化
└── mqtt-spy → 调试分析
客户端库:
├── Python: paho-mqtt
├── Java: Eclipse Paho / HiveMQ Client
├── Go: eclipse/paho.mqtt.golang
├── C/C++: Eclipse Paho C
├── JS: MQTT.js
└── Rust: rumqttc
集成框架:
├── Node-RED → 可视化流程编排
├── Home Assistant → 智能家居
├── ThingsBoard → IoT 平台
└── EMQX Rule Engine → 数据处理规则引擎