Skip to content

Kafka 故障处理案例

案例一:消费者 Lag 持续增长,处理能力不足

故障现象

IoT 数据处理服务的消费者 Lag 从正常的 100 条增长到 50 万条,且持续增长,数据处理延迟超过 30 分钟。

排查过程

bash
# 1. 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group iot-processor --describe

# TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG     CONSUMER-ID
# iot-telemetry   0          100000          600000          500000  processor-1
# iot-telemetry   1          200000          700000          500000  processor-2
# iot-telemetry   2          (none)          600000          600000  -  ← 无消费者!

# 发现 Partition 2 没有消费者分配

# 2. 查看消费者组成员
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group iot-processor --describe --members

# 只有 2 个消费者实例,但 Topic 有 3 个分区

# 3. 查看消费者处理速率
# 通过 JMX 或 Prometheus 查看 records-consumed-rate
# 发现每个消费者每秒处理约 200 条,而生产速率为 1500 条/秒

根本原因

  1. 消费者实例数(2)少于分区数(3),导致一个分区无人消费
  2. 每条消息处理包含同步数据库写入,耗时约 5ms,限制了吞吐量

解决方案

bash
# 1. 立即扩容消费者实例(Kubernetes 场景)
kubectl scale deployment iot-processor --replicas=6

# 2. 优化数据库写入(批量写入)
java
// 改为批量写入,减少数据库 RTT
public class BatchDatabaseWriter {
    private final List<TelemetryRecord> buffer = new ArrayList<>();
    private static final int BATCH_SIZE = 500;

    public void processRecord(ConsumerRecord<String, String> record) {
        buffer.add(parseRecord(record));

        if (buffer.size() >= BATCH_SIZE) {
            flushToDatabase();
        }
    }

    private void flushToDatabase() {
        if (buffer.isEmpty()) return;
        // 批量插入,单次 RTT 写入 500 条
        jdbcTemplate.batchUpdate(INSERT_SQL,
            buffer.stream()
                  .map(r -> new Object[]{r.deviceId, r.timestamp, r.value})
                  .collect(Collectors.toList()));
        buffer.clear();
    }
}

案例二:Broker Leader 选举风暴

故障现象

Kafka 集群在高负载期间频繁触发 Leader 重新选举,生产者出现大量 NotLeaderForPartitionException,消息延迟飙升。

排查过程

bash
# 查看 Broker 日志
grep "LeaderElection\|NotLeaderForPartition" /var/log/kafka/server.log | tail -100

# 查看 ISR 变化
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --topic iot-telemetry | grep "Isr:"
# 发现 ISR 频繁缩减:Isr: 1,2,3 → Isr: 1 → Isr: 1,2,3

# 检查 Broker 2 和 3 的 GC 日志
grep "GC pause\|Full GC" /var/log/kafka/kafkaServer-gc.log | tail -20
# 发现 Full GC 暂停时间超过 30 秒!

# 查看 JVM 堆内存配置
ps aux | grep kafka | grep -o '\-Xmx[^ ]*'
# -Xmx6g  ← 堆内存 6GB,但机器只有 8GB,OS 页缓存严重不足

根本原因

JVM 堆内存配置过大,挤占了操作系统页缓存(Kafka 严重依赖页缓存),导致频繁 Full GC,Broker 响应超时,触发 Leader 重选。

解决方案

bash
# 1. 调整 JVM 堆内存(Kafka 推荐 4-6GB,不超过物理内存的 50%)
# kafka-server-start.sh 中修改:
export KAFKA_HEAP_OPTS="-Xmx4g -Xms4g"

# 2. 使用 G1GC 减少 GC 停顿
export KAFKA_JVM_PERFORMANCE_OPTS="-server \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=20 \
  -XX:InitiatingHeapOccupancyPercent=35 \
  -XX:+ExplicitGCInvokesConcurrent \
  -Djava.awt.headless=true"

# 3. 调整副本同步超时(给 GC 留余量)
# server.properties:
replica.lag.time.max.ms=60000  # 从 30s 增加到 60s
zookeeper.session.timeout.ms=18000

案例三:消息重复消费导致数据库重复写入

故障现象

TimescaleDB 中出现大量重复时序数据,同一设备同一时间戳的数据存在多条记录。

排查过程

bash
# 查询重复数据
SELECT device_id, timestamp, COUNT(*) as cnt
FROM iot_telemetry
GROUP BY device_id, timestamp
HAVING COUNT(*) > 1
ORDER BY cnt DESC
LIMIT 10;

# 发现重复率约 0.3%,集中在消费者重启时间点附近

# 查看消费者提交日志
grep "commitSync\|Rebalance" /var/log/iot-processor.log | tail -50
# 发现 Rebalance 期间有消息被重复处理

根本原因

消费者在处理消息后、提交 offset 前发生 Rebalance,导致消息被重新分配给其他消费者实例重复处理。

解决方案

sql
-- 方案1:数据库层面去重(TimescaleDB)
-- 使用 ON CONFLICT DO NOTHING
INSERT INTO iot_telemetry (device_id, timestamp, value, unit)
VALUES ($1, $2, $3, $4)
ON CONFLICT (device_id, timestamp) DO NOTHING;

-- 创建唯一索引
CREATE UNIQUE INDEX idx_telemetry_unique
ON iot_telemetry (device_id, timestamp);
java
// 方案2:应用层幂等处理
public class IdempotentProcessor {
    private final RedisTemplate<String, String> redis;

    public boolean isProcessed(String messageId) {
        String key = "processed:" + messageId;
        // SET NX EX:原子操作,设置成功说明未处理过
        Boolean isNew = redis.opsForValue()
            .setIfAbsent(key, "1", Duration.ofHours(24));
        return !Boolean.TRUE.equals(isNew);
    }

    public void process(ConsumerRecord<String, String> record) {
        String messageId = record.topic() + ":" +
                           record.partition() + ":" +
                           record.offset();

        if (isProcessed(messageId)) {
            log.debug("Skipping duplicate message: {}", messageId);
            return;
        }

        // 处理消息...
        writeToDatabase(record);
    }
}

案例四:Topic 分区数不足,扩容后消息乱序

故障现象

为提升吞吐量,将 iot-commands Topic 从 6 个分区扩容到 12 个分区后,同一设备的控制指令出现乱序执行。

根本原因

Kafka 分区扩容后,原有 Key 的哈希路由结果改变,同一设备的消息可能路由到不同分区,而不同分区间消息顺序无法保证。

解决方案

java
// 方案1:扩容前迁移数据,使用自定义分区器保持路由一致性
public class StablePartitioner implements Partitioner {
    private static final int ORIGINAL_PARTITION_COUNT = 6;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return 0;
        }
        // 使用原始分区数计算,保持路由稳定
        int originalPartition = Math.abs(
            Arrays.hashCode(keyBytes)) % ORIGINAL_PARTITION_COUNT;
        return originalPartition;
    }
}

// 方案2:在消息中加入序列号,消费者端排序
@Data
public class OrderedCommand {
    private String deviceId;
    private long sequence;      // 单调递增序列号
    private long timestamp;
    private String command;
    private String payload;
}

// 消费者端按序列号排序执行
public class OrderedCommandProcessor {
    private final Map<String, TreeMap<Long, OrderedCommand>> pendingCommands
        = new ConcurrentHashMap<>();

    public void process(OrderedCommand cmd) {
        pendingCommands
            .computeIfAbsent(cmd.getDeviceId(), k -> new TreeMap<>())
            .put(cmd.getSequence(), cmd);

        // 按序执行
        TreeMap<Long, OrderedCommand> queue =
            pendingCommands.get(cmd.getDeviceId());
        while (!queue.isEmpty()) {
            Map.Entry<Long, OrderedCommand> first = queue.firstEntry();
            if (isNextExpected(cmd.getDeviceId(), first.getKey())) {
                executeCommand(queue.pollFirstEntry().getValue());
            } else {
                break; // 等待缺失的序列号
            }
        }
    }
}

常用运维命令

bash
# 查看 Topic 详情
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic iot-telemetry

# 查看消费者组列表
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list

# 重置消费者偏移量(从头开始重新消费)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group iot-processor --topic iot-telemetry \
  --reset-offsets --to-earliest --execute

# 重置到指定时间点
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group iot-processor --topic iot-telemetry \
  --reset-offsets --to-datetime 2024-01-01T00:00:00.000 --execute

# 查看消息内容
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic iot-telemetry --from-beginning --max-messages 10

# 查看 Broker 配置
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers --entity-name 1 --describe

# 优先副本选举(恢复 Leader 均衡)
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred --all-topic-partitions

褚成志的IoT笔记