Kafka 故障处理案例
案例一:消费者 Lag 持续增长,处理能力不足
故障现象
IoT 数据处理服务的消费者 Lag 从正常的 100 条增长到 50 万条,且持续增长,数据处理延迟超过 30 分钟。
排查过程
bash
# 1. 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group iot-processor --describe
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# iot-telemetry 0 100000 600000 500000 processor-1
# iot-telemetry 1 200000 700000 500000 processor-2
# iot-telemetry 2 (none) 600000 600000 - ← 无消费者!
# 发现 Partition 2 没有消费者分配
# 2. 查看消费者组成员
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group iot-processor --describe --members
# 只有 2 个消费者实例,但 Topic 有 3 个分区
# 3. 查看消费者处理速率
# 通过 JMX 或 Prometheus 查看 records-consumed-rate
# 发现每个消费者每秒处理约 200 条,而生产速率为 1500 条/秒根本原因
- 消费者实例数(2)少于分区数(3),导致一个分区无人消费
- 每条消息处理包含同步数据库写入,耗时约 5ms,限制了吞吐量
解决方案
bash
# 1. 立即扩容消费者实例(Kubernetes 场景)
kubectl scale deployment iot-processor --replicas=6
# 2. 优化数据库写入(批量写入)java
// 改为批量写入,减少数据库 RTT
public class BatchDatabaseWriter {
private final List<TelemetryRecord> buffer = new ArrayList<>();
private static final int BATCH_SIZE = 500;
public void processRecord(ConsumerRecord<String, String> record) {
buffer.add(parseRecord(record));
if (buffer.size() >= BATCH_SIZE) {
flushToDatabase();
}
}
private void flushToDatabase() {
if (buffer.isEmpty()) return;
// 批量插入,单次 RTT 写入 500 条
jdbcTemplate.batchUpdate(INSERT_SQL,
buffer.stream()
.map(r -> new Object[]{r.deviceId, r.timestamp, r.value})
.collect(Collectors.toList()));
buffer.clear();
}
}案例二:Broker Leader 选举风暴
故障现象
Kafka 集群在高负载期间频繁触发 Leader 重新选举,生产者出现大量 NotLeaderForPartitionException,消息延迟飙升。
排查过程
bash
# 查看 Broker 日志
grep "LeaderElection\|NotLeaderForPartition" /var/log/kafka/server.log | tail -100
# 查看 ISR 变化
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --topic iot-telemetry | grep "Isr:"
# 发现 ISR 频繁缩减:Isr: 1,2,3 → Isr: 1 → Isr: 1,2,3
# 检查 Broker 2 和 3 的 GC 日志
grep "GC pause\|Full GC" /var/log/kafka/kafkaServer-gc.log | tail -20
# 发现 Full GC 暂停时间超过 30 秒!
# 查看 JVM 堆内存配置
ps aux | grep kafka | grep -o '\-Xmx[^ ]*'
# -Xmx6g ← 堆内存 6GB,但机器只有 8GB,OS 页缓存严重不足根本原因
JVM 堆内存配置过大,挤占了操作系统页缓存(Kafka 严重依赖页缓存),导致频繁 Full GC,Broker 响应超时,触发 Leader 重选。
解决方案
bash
# 1. 调整 JVM 堆内存(Kafka 推荐 4-6GB,不超过物理内存的 50%)
# kafka-server-start.sh 中修改:
export KAFKA_HEAP_OPTS="-Xmx4g -Xms4g"
# 2. 使用 G1GC 减少 GC 停顿
export KAFKA_JVM_PERFORMANCE_OPTS="-server \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-Djava.awt.headless=true"
# 3. 调整副本同步超时(给 GC 留余量)
# server.properties:
replica.lag.time.max.ms=60000 # 从 30s 增加到 60s
zookeeper.session.timeout.ms=18000案例三:消息重复消费导致数据库重复写入
故障现象
TimescaleDB 中出现大量重复时序数据,同一设备同一时间戳的数据存在多条记录。
排查过程
bash
# 查询重复数据
SELECT device_id, timestamp, COUNT(*) as cnt
FROM iot_telemetry
GROUP BY device_id, timestamp
HAVING COUNT(*) > 1
ORDER BY cnt DESC
LIMIT 10;
# 发现重复率约 0.3%,集中在消费者重启时间点附近
# 查看消费者提交日志
grep "commitSync\|Rebalance" /var/log/iot-processor.log | tail -50
# 发现 Rebalance 期间有消息被重复处理根本原因
消费者在处理消息后、提交 offset 前发生 Rebalance,导致消息被重新分配给其他消费者实例重复处理。
解决方案
sql
-- 方案1:数据库层面去重(TimescaleDB)
-- 使用 ON CONFLICT DO NOTHING
INSERT INTO iot_telemetry (device_id, timestamp, value, unit)
VALUES ($1, $2, $3, $4)
ON CONFLICT (device_id, timestamp) DO NOTHING;
-- 创建唯一索引
CREATE UNIQUE INDEX idx_telemetry_unique
ON iot_telemetry (device_id, timestamp);java
// 方案2:应用层幂等处理
public class IdempotentProcessor {
private final RedisTemplate<String, String> redis;
public boolean isProcessed(String messageId) {
String key = "processed:" + messageId;
// SET NX EX:原子操作,设置成功说明未处理过
Boolean isNew = redis.opsForValue()
.setIfAbsent(key, "1", Duration.ofHours(24));
return !Boolean.TRUE.equals(isNew);
}
public void process(ConsumerRecord<String, String> record) {
String messageId = record.topic() + ":" +
record.partition() + ":" +
record.offset();
if (isProcessed(messageId)) {
log.debug("Skipping duplicate message: {}", messageId);
return;
}
// 处理消息...
writeToDatabase(record);
}
}案例四:Topic 分区数不足,扩容后消息乱序
故障现象
为提升吞吐量,将 iot-commands Topic 从 6 个分区扩容到 12 个分区后,同一设备的控制指令出现乱序执行。
根本原因
Kafka 分区扩容后,原有 Key 的哈希路由结果改变,同一设备的消息可能路由到不同分区,而不同分区间消息顺序无法保证。
解决方案
java
// 方案1:扩容前迁移数据,使用自定义分区器保持路由一致性
public class StablePartitioner implements Partitioner {
private static final int ORIGINAL_PARTITION_COUNT = 6;
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return 0;
}
// 使用原始分区数计算,保持路由稳定
int originalPartition = Math.abs(
Arrays.hashCode(keyBytes)) % ORIGINAL_PARTITION_COUNT;
return originalPartition;
}
}
// 方案2:在消息中加入序列号,消费者端排序
@Data
public class OrderedCommand {
private String deviceId;
private long sequence; // 单调递增序列号
private long timestamp;
private String command;
private String payload;
}
// 消费者端按序列号排序执行
public class OrderedCommandProcessor {
private final Map<String, TreeMap<Long, OrderedCommand>> pendingCommands
= new ConcurrentHashMap<>();
public void process(OrderedCommand cmd) {
pendingCommands
.computeIfAbsent(cmd.getDeviceId(), k -> new TreeMap<>())
.put(cmd.getSequence(), cmd);
// 按序执行
TreeMap<Long, OrderedCommand> queue =
pendingCommands.get(cmd.getDeviceId());
while (!queue.isEmpty()) {
Map.Entry<Long, OrderedCommand> first = queue.firstEntry();
if (isNextExpected(cmd.getDeviceId(), first.getKey())) {
executeCommand(queue.pollFirstEntry().getValue());
} else {
break; // 等待缺失的序列号
}
}
}
}常用运维命令
bash
# 查看 Topic 详情
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic iot-telemetry
# 查看消费者组列表
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# 重置消费者偏移量(从头开始重新消费)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group iot-processor --topic iot-telemetry \
--reset-offsets --to-earliest --execute
# 重置到指定时间点
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group iot-processor --topic iot-telemetry \
--reset-offsets --to-datetime 2024-01-01T00:00:00.000 --execute
# 查看消息内容
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic iot-telemetry --from-beginning --max-messages 10
# 查看 Broker 配置
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 --describe
# 优先副本选举(恢复 Leader 均衡)
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred --all-topic-partitions