MQTT 故障处理案例
案例一:消息风暴导致 Broker 崩溃
故障现象
生产环境 EMQX Broker 在某天早上 8 点突然 CPU 飙升至 100%,大量客户端连接超时断开,系统雪崩。
排查过程
bash
# 查看 EMQX 指标
curl -u admin:public http://emqx:18083/api/v5/metrics | jq '.messages_publish_rate'
# 返回:458000 ← 每秒 45 万条消息,正常应为 5000
# 查看哪些客户端发布最多
curl -u admin:public http://emqx:18083/api/v5/clients?limit=10 | \
jq '.data | sort_by(.send_msg) | reverse | .[0:5]'
# 发现 device-factory-line3-* 系列客户端异常
# 每个设备每秒发布 1000+ 条消息
# 查看具体主题
curl -u admin:public "http://emqx:18083/api/v5/clients/device-factory-line3-001" | \
jq '.subscriptions_cnt, .send_msg'根本原因
设备固件 Bug:在网络抖动重连后,发布循环的退避逻辑失效,进入无限快速重发循环。同时,该设备使用 QoS 1,Broker 的 PUBACK 处理压力倍增。
解决方案
bash
# 紧急处置:在 EMQX 中限制该客户端发布速率
# EMQX 规则引擎 - 添加速率限制规则
# 1. 通过 API 踢掉异常客户端
curl -X DELETE -u admin:public \
http://emqx:18083/api/v5/clients/device-factory-line3-001
# 2. 配置客户端发布速率限制(EMQX 5.x)
# emqx.conf:
# mqtt.pub_limit = "1000/s" # 每客户端每秒最多 1000 条
# 3. 配置连接速率限制
# listeners.tcp.default.max_conn_rate = 1000
# 固件修复:添加指数退避
import time
import random
class PublishRateLimiter:
def __init__(self, max_rate=10): # 最大 10 条/秒
self.max_rate = max_rate
self.min_interval = 1.0 / max_rate
self.last_publish = 0
def publish(self, client, topic, payload, qos=1):
now = time.time()
elapsed = now - self.last_publish
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
client.publish(topic, payload, qos=qos)
self.last_publish = time.time()案例二:QoS 1 消息重复导致重复执行控制指令
故障现象
储能系统 PCS 收到重复的"充电启动"指令,导致控制逻辑异常,保护装置触发。
排查过程
python
# 在消息处理日志中发现:
# [10:23:45.123] Received cmd: start_charge, msg_id=1001
# [10:23:45.891] Received cmd: start_charge, msg_id=1001 ← 重复!
# [10:23:46.234] Received cmd: start_charge, msg_id=1001 ← 再次重复!
# 原因:网络抖动导致 PUBACK 丢失,Broker 重传 QoS 1 消息
# 设备端未做幂等处理解决方案
python
import hashlib
import time
from collections import OrderedDict
class IdempotentCommandHandler:
"""幂等命令处理器,防止重复执行"""
def __init__(self, dedup_window=300):
self.dedup_window = dedup_window # 去重窗口(秒)
self.processed_ids = OrderedDict()
def _generate_msg_fingerprint(self, msg) -> str:
"""生成消息指纹"""
content = f"{msg.topic}:{msg.payload}:{msg.mid}"
return hashlib.md5(content.encode()).hexdigest()
def handle(self, msg, handler_func):
fingerprint = self._generate_msg_fingerprint(msg)
now = time.time()
# 清理过期记录
expired = [k for k, v in self.processed_ids.items()
if now - v > self.dedup_window]
for k in expired:
del self.processed_ids[k]
# 检查是否重复
if fingerprint in self.processed_ids:
logging.warning(f"Duplicate message detected, skipping: {fingerprint}")
return
# 记录并处理
self.processed_ids[fingerprint] = now
handler_func(msg)
# 使用示例
dedup_handler = IdempotentCommandHandler(dedup_window=60)
def on_command(client, userdata, msg):
dedup_handler.handle(msg, execute_command)
def execute_command(msg):
cmd = json.loads(msg.payload)
# 执行控制逻辑...案例三:持久会话消息积压导致设备重连后崩溃
故障现象
设备离线维护 48 小时后重新上线,立即收到大量积压消息,内存溢出崩溃。
排查过程
bash
# 查看 EMQX 中该客户端的离线消息队列
curl -u admin:public \
"http://emqx:18083/api/v5/clients/device-001/mqueue_messages" | \
jq '.meta.count'
# 返回:48000 ← 积压 4.8 万条消息
# 设备内存仅 64MB,无法处理如此大量消息解决方案
bash
# 1. 清空该客户端的离线消息队列
curl -X DELETE -u admin:public \
"http://emqx:18083/api/v5/clients/device-001/mqueue_messages"
# 2. 配置合理的离线消息队列上限
# emqx.conf:
# mqtt.max_mqueue_len = 1000 # 最多缓存 1000 条
# mqtt.mqueue_default_priority = 0
# mqtt.mqueue_store_qos0 = false # QoS 0 不缓存
# 3. 设备端:重连后分批处理消息python
class ThrottledMessageProcessor:
"""限速消息处理器,防止重连后消息洪峰"""
def __init__(self, max_rate=50): # 每秒最多处理 50 条
self.max_rate = max_rate
self.queue = asyncio.Queue(maxsize=2000)
async def process_loop(self):
interval = 1.0 / self.max_rate
while True:
msg = await self.queue.get()
await self.process_message(msg)
await asyncio.sleep(interval)
def on_message(self, client, userdata, msg):
try:
self.queue.put_nowait(msg)
except asyncio.QueueFull:
logging.warning("Processing queue full, dropping old message")
self.queue.get_nowait() # 丢弃最旧的
self.queue.put_nowait(msg)案例四:TLS 握手失败,证书过期
故障现象
所有设备在同一天突然无法连接 Broker,日志显示 TLS 握手失败。
排查过程
bash
# 检查证书有效期
openssl x509 -in /etc/mqtt/server.crt -noout -dates
# notBefore=Jan 1 00:00:00 2023 GMT
# notAfter=Jan 1 00:00:00 2024 GMT ← 已过期!
# 验证连接
openssl s_client -connect broker.example.com:8883 -CAfile ca.crt
# Verify return code: 10 (certificate has expired)解决方案
bash
# 1. 立即更新证书
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr \
-subj "/CN=broker.example.com/O=MyOrg"
openssl x509 -req -days 3650 -in server.csr \
-CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt
# 2. 热更新 EMQX 证书(无需重启)
curl -X PUT -u admin:public \
http://emqx:18083/api/v5/listeners/ssl:default \
-H "Content-Type: application/json" \
-d '{"ssl_options": {"certfile": "/etc/emqx/certs/server.crt"}}'
# 3. 建立证书过期监控
cat > /usr/bin/cert-monitor.sh <<'EOF'
#!/bin/bash
CERT_FILE="/etc/mqtt/server.crt"
WARN_DAYS=30
EXPIRY=$(openssl x509 -in "$CERT_FILE" -noout -enddate | cut -d= -f2)
EXPIRY_EPOCH=$(date -d "$EXPIRY" +%s)
NOW_EPOCH=$(date +%s)
DAYS_LEFT=$(( (EXPIRY_EPOCH - NOW_EPOCH) / 86400 ))
if [ "$DAYS_LEFT" -lt "$WARN_DAYS" ]; then
echo "WARNING: Certificate expires in $DAYS_LEFT days!"
# 发送告警...
fi
EOF
# 4. 使用 Let's Encrypt 自动续期(公网 Broker)
certbot renew --pre-hook "..." --post-hook "systemctl reload emqx"常见错误码速查
| 错误 | 原因 | 解决方法 |
|---|---|---|
| Connection Refused (rc=1) | 协议版本不匹配 | 检查客户端 MQTT 版本设置 |
| Connection Refused (rc=2) | Client ID 不合法 | 检查 Client ID 长度和字符 |
| Connection Refused (rc=3) | Broker 不可用 | 检查 Broker 服务状态 |
| Connection Refused (rc=4) | 用户名/密码错误 | 检查认证配置 |
| Connection Refused (rc=5) | 未授权 | 检查 ACL 规则 |
| Disconnect (rc=130) | Keep Alive 超时 | 增大 keepalive 或检查网络 |
| TLS Handshake Failed | 证书问题 | 检查证书有效期和 CA 链 |
| Message Too Large | 超过 Broker 限制 | 分片发送或增大 max_packet_size |
调试工具命令
bash
# 订阅所有主题(调试用)
mosquitto_sub -h broker -t '#' -v
# 发布测试消息
mosquitto_pub -h broker -t 'test/hello' -m 'world' -q 1
# 查看 EMQX 实时日志
docker logs -f emqx | grep -E "ERROR|WARN"
# 抓包分析 MQTT 流量
tcpdump -i eth0 -w mqtt.pcap port 1883
# 用 Wireshark 打开,过滤:mqtt
# 压力测试
emqtt-bench pub -h broker -t 'bench/%i' -c 1000 -n 100 -q 1