Skip to content

MQTT 故障处理案例

案例一:消息风暴导致 Broker 崩溃

故障现象

生产环境 EMQX Broker 在某天早上 8 点突然 CPU 飙升至 100%,大量客户端连接超时断开,系统雪崩。

排查过程

bash
# 查看 EMQX 指标
curl -u admin:public http://emqx:18083/api/v5/metrics | jq '.messages_publish_rate'
# 返回:458000  ← 每秒 45 万条消息,正常应为 5000

# 查看哪些客户端发布最多
curl -u admin:public http://emqx:18083/api/v5/clients?limit=10 | \
  jq '.data | sort_by(.send_msg) | reverse | .[0:5]'

# 发现 device-factory-line3-* 系列客户端异常
# 每个设备每秒发布 1000+ 条消息

# 查看具体主题
curl -u admin:public "http://emqx:18083/api/v5/clients/device-factory-line3-001" | \
  jq '.subscriptions_cnt, .send_msg'

根本原因

设备固件 Bug:在网络抖动重连后,发布循环的退避逻辑失效,进入无限快速重发循环。同时,该设备使用 QoS 1,Broker 的 PUBACK 处理压力倍增。

解决方案

bash
# 紧急处置:在 EMQX 中限制该客户端发布速率
# EMQX 规则引擎 - 添加速率限制规则

# 1. 通过 API 踢掉异常客户端
curl -X DELETE -u admin:public \
  http://emqx:18083/api/v5/clients/device-factory-line3-001

# 2. 配置客户端发布速率限制(EMQX 5.x)
# emqx.conf:
# mqtt.pub_limit = "1000/s"  # 每客户端每秒最多 1000 条

# 3. 配置连接速率限制
# listeners.tcp.default.max_conn_rate = 1000

# 固件修复:添加指数退避
import time
import random

class PublishRateLimiter:
    def __init__(self, max_rate=10):  # 最大 10 条/秒
        self.max_rate = max_rate
        self.min_interval = 1.0 / max_rate
        self.last_publish = 0

    def publish(self, client, topic, payload, qos=1):
        now = time.time()
        elapsed = now - self.last_publish
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        client.publish(topic, payload, qos=qos)
        self.last_publish = time.time()

案例二:QoS 1 消息重复导致重复执行控制指令

故障现象

储能系统 PCS 收到重复的"充电启动"指令,导致控制逻辑异常,保护装置触发。

排查过程

python
# 在消息处理日志中发现:
# [10:23:45.123] Received cmd: start_charge, msg_id=1001
# [10:23:45.891] Received cmd: start_charge, msg_id=1001  ← 重复!
# [10:23:46.234] Received cmd: start_charge, msg_id=1001  ← 再次重复!

# 原因:网络抖动导致 PUBACK 丢失,Broker 重传 QoS 1 消息
# 设备端未做幂等处理

解决方案

python
import hashlib
import time
from collections import OrderedDict

class IdempotentCommandHandler:
    """幂等命令处理器,防止重复执行"""

    def __init__(self, dedup_window=300):
        self.dedup_window = dedup_window  # 去重窗口(秒)
        self.processed_ids = OrderedDict()

    def _generate_msg_fingerprint(self, msg) -> str:
        """生成消息指纹"""
        content = f"{msg.topic}:{msg.payload}:{msg.mid}"
        return hashlib.md5(content.encode()).hexdigest()

    def handle(self, msg, handler_func):
        fingerprint = self._generate_msg_fingerprint(msg)
        now = time.time()

        # 清理过期记录
        expired = [k for k, v in self.processed_ids.items()
                   if now - v > self.dedup_window]
        for k in expired:
            del self.processed_ids[k]

        # 检查是否重复
        if fingerprint in self.processed_ids:
            logging.warning(f"Duplicate message detected, skipping: {fingerprint}")
            return

        # 记录并处理
        self.processed_ids[fingerprint] = now
        handler_func(msg)

# 使用示例
dedup_handler = IdempotentCommandHandler(dedup_window=60)

def on_command(client, userdata, msg):
    dedup_handler.handle(msg, execute_command)

def execute_command(msg):
    cmd = json.loads(msg.payload)
    # 执行控制逻辑...

案例三:持久会话消息积压导致设备重连后崩溃

故障现象

设备离线维护 48 小时后重新上线,立即收到大量积压消息,内存溢出崩溃。

排查过程

bash
# 查看 EMQX 中该客户端的离线消息队列
curl -u admin:public \
  "http://emqx:18083/api/v5/clients/device-001/mqueue_messages" | \
  jq '.meta.count'
# 返回:48000  ← 积压 4.8 万条消息

# 设备内存仅 64MB,无法处理如此大量消息

解决方案

bash
# 1. 清空该客户端的离线消息队列
curl -X DELETE -u admin:public \
  "http://emqx:18083/api/v5/clients/device-001/mqueue_messages"

# 2. 配置合理的离线消息队列上限
# emqx.conf:
# mqtt.max_mqueue_len = 1000      # 最多缓存 1000 条
# mqtt.mqueue_default_priority = 0
# mqtt.mqueue_store_qos0 = false  # QoS 0 不缓存

# 3. 设备端:重连后分批处理消息
python
class ThrottledMessageProcessor:
    """限速消息处理器,防止重连后消息洪峰"""

    def __init__(self, max_rate=50):  # 每秒最多处理 50 条
        self.max_rate = max_rate
        self.queue = asyncio.Queue(maxsize=2000)

    async def process_loop(self):
        interval = 1.0 / self.max_rate
        while True:
            msg = await self.queue.get()
            await self.process_message(msg)
            await asyncio.sleep(interval)

    def on_message(self, client, userdata, msg):
        try:
            self.queue.put_nowait(msg)
        except asyncio.QueueFull:
            logging.warning("Processing queue full, dropping old message")
            self.queue.get_nowait()  # 丢弃最旧的
            self.queue.put_nowait(msg)

案例四:TLS 握手失败,证书过期

故障现象

所有设备在同一天突然无法连接 Broker,日志显示 TLS 握手失败。

排查过程

bash
# 检查证书有效期
openssl x509 -in /etc/mqtt/server.crt -noout -dates
# notBefore=Jan  1 00:00:00 2023 GMT
# notAfter=Jan  1 00:00:00 2024 GMT  ← 已过期!

# 验证连接
openssl s_client -connect broker.example.com:8883 -CAfile ca.crt
# Verify return code: 10 (certificate has expired)

解决方案

bash
# 1. 立即更新证书
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr \
  -subj "/CN=broker.example.com/O=MyOrg"
openssl x509 -req -days 3650 -in server.csr \
  -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt

# 2. 热更新 EMQX 证书(无需重启)
curl -X PUT -u admin:public \
  http://emqx:18083/api/v5/listeners/ssl:default \
  -H "Content-Type: application/json" \
  -d '{"ssl_options": {"certfile": "/etc/emqx/certs/server.crt"}}'

# 3. 建立证书过期监控
cat > /usr/bin/cert-monitor.sh <<'EOF'
#!/bin/bash
CERT_FILE="/etc/mqtt/server.crt"
WARN_DAYS=30

EXPIRY=$(openssl x509 -in "$CERT_FILE" -noout -enddate | cut -d= -f2)
EXPIRY_EPOCH=$(date -d "$EXPIRY" +%s)
NOW_EPOCH=$(date +%s)
DAYS_LEFT=$(( (EXPIRY_EPOCH - NOW_EPOCH) / 86400 ))

if [ "$DAYS_LEFT" -lt "$WARN_DAYS" ]; then
    echo "WARNING: Certificate expires in $DAYS_LEFT days!"
    # 发送告警...
fi
EOF

# 4. 使用 Let's Encrypt 自动续期(公网 Broker)
certbot renew --pre-hook "..." --post-hook "systemctl reload emqx"

常见错误码速查

错误原因解决方法
Connection Refused (rc=1)协议版本不匹配检查客户端 MQTT 版本设置
Connection Refused (rc=2)Client ID 不合法检查 Client ID 长度和字符
Connection Refused (rc=3)Broker 不可用检查 Broker 服务状态
Connection Refused (rc=4)用户名/密码错误检查认证配置
Connection Refused (rc=5)未授权检查 ACL 规则
Disconnect (rc=130)Keep Alive 超时增大 keepalive 或检查网络
TLS Handshake Failed证书问题检查证书有效期和 CA 链
Message Too Large超过 Broker 限制分片发送或增大 max_packet_size

调试工具命令

bash
# 订阅所有主题(调试用)
mosquitto_sub -h broker -t '#' -v

# 发布测试消息
mosquitto_pub -h broker -t 'test/hello' -m 'world' -q 1

# 查看 EMQX 实时日志
docker logs -f emqx | grep -E "ERROR|WARN"

# 抓包分析 MQTT 流量
tcpdump -i eth0 -w mqtt.pcap port 1883
# 用 Wireshark 打开,过滤:mqtt

# 压力测试
emqtt-bench pub -h broker -t 'bench/%i' -c 1000 -n 100 -q 1

褚成志的IoT笔记