Kafka 最佳实践
分区策略设计
分区数量计算
分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者实例数)
单分区写入吞吐:约 10-20 MB/s(取决于消息大小和硬件)
单分区读取吞吐:约 50-100 MB/s
示例:
目标写入:200 MB/s
单分区写入:10 MB/s
最小分区数:200 / 10 = 20 个分区
建议:分区数设为消费者实例数的整数倍,便于均匀分配IoT 场景分区键设计
java
// 策略1:按设备 ID 分区(保证单设备消息有序)
String partitionKey = deviceId;
// 策略2:按站点+设备类型分区(平衡负载)
String partitionKey = siteId + ":" + deviceType;
// 策略3:时间分桶(适合时序分析)
String partitionKey = deviceId + ":" + (timestamp / 60000); // 按分钟
// 注意:避免热点分区
// 错误:所有消息用同一个 key → 全部路由到同一分区
// 正确:key 分布均匀,避免某些设备数据量远大于其他可靠性配置
生产者可靠性
java
// 最高可靠性配置(适合控制指令、计费数据)
Properties reliableProps = new Properties();
reliableProps.put(ProducerConfig.ACKS_CONFIG, "all");
reliableProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
reliableProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
reliableProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
reliableProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2分钟
// 高吞吐配置(适合遥测数据,允许少量丢失)
Properties throughputProps = new Properties();
throughputProps.put(ProducerConfig.ACKS_CONFIG, "1"); // 只等 Leader 确认
throughputProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);
throughputProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB
throughputProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");事务性生产者(精确一次语义)
java
// 适用于:控制指令、需要原子性的多 Topic 写入
Properties txProps = new Properties();
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "iot-control-producer-1");
txProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> txProducer = new KafkaProducer<>(txProps);
txProducer.initTransactions();
try {
txProducer.beginTransaction();
// 原子写入多个 Topic
txProducer.send(new ProducerRecord<>("iot-commands", deviceId, command));
txProducer.send(new ProducerRecord<>("iot-audit-log", deviceId, auditEntry));
txProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
txProducer.close();
throw e;
} catch (KafkaException e) {
txProducer.abortTransaction();
throw e;
}消费者可靠性
java
// 手动提交 + 至少一次处理
public class ReliableConsumer {
private final KafkaConsumer<String, String> consumer;
public void run() {
consumer.subscribe(Collections.singletonList("iot-telemetry"));
while (running) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (RetryableException e) {
// 可重试异常:不提交,下次重新处理
log.warn("Retryable error, will retry: {}", e.getMessage());
consumer.seek(
new TopicPartition(record.topic(), record.partition()),
record.offset()
);
break;
} catch (NonRetryableException e) {
// 不可重试:发送到死信队列
sendToDeadLetterQueue(record, e);
}
}
// 处理完一批后提交
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("Commit failed", e);
}
}
}
}性能调优
Broker 配置优化
properties
# server.properties
# 网络线程
num.network.threads=8
num.io.threads=16
# 日志刷盘策略(性能优先,依赖副本保证可靠性)
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 日志保留
log.retention.hours=168 # 7天
log.retention.bytes=107374182400 # 100GB per partition
log.segment.bytes=1073741824 # 1GB per segment
# 副本同步
replica.lag.time.max.ms=30000
replica.fetch.max.bytes=1048576
# 压缩
compression.type=producer # 保留生产者压缩格式
# 消息大小
message.max.bytes=10485760 # 10MB操作系统优化
bash
# 增大文件描述符限制
echo "* soft nofile 100000" >> /etc/security/limits.conf
echo "* hard nofile 100000" >> /etc/security/limits.conf
# 调整虚拟内存
sysctl -w vm.swappiness=1
sysctl -w vm.dirty_ratio=80
sysctl -w vm.dirty_background_ratio=5
# 网络优化
sysctl -w net.core.rmem_max=134217728
sysctl -w net.core.wmem_max=134217728
sysctl -w net.ipv4.tcp_rmem="4096 65536 134217728"
sysctl -w net.ipv4.tcp_wmem="4096 65536 134217728"
# 使用 XFS 文件系统(比 ext4 更适合 Kafka)
mkfs.xfs -f /dev/sdb
mount -o noatime,nodiratime /dev/sdb /kafka/data消费者 Lag 监控
bash
# 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group iot-data-processor --describe
# 输出示例:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# iot-telemetry 0 12345 12400 55
# iot-telemetry 1 23456 23500 44
# iot-telemetry 2 34567 34600 33
# 告警阈值建议:
# 正常:Lag < 1000
# 警告:Lag > 10000
# 严重:Lag > 100000(消费者可能已停止)python
# Python 监控脚本
from kafka.admin import KafkaAdminClient
from kafka import KafkaConsumer, TopicPartition
def get_consumer_lag(bootstrap_servers, group_id, topic):
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id + "-monitor", # 不影响原消费者组
enable_auto_commit=False
)
partitions = consumer.partitions_for_topic(topic)
tps = [TopicPartition(topic, p) for p in partitions]
# 获取当前消费位置
consumer.assign(tps)
consumer.seek_to_end(*tps)
end_offsets = {tp: consumer.position(tp) for tp in tps}
# 获取消费者组提交的偏移量
committed = {tp: consumer.committed(tp) or 0 for tp in tps}
total_lag = sum(end_offsets[tp] - committed[tp] for tp in tps)
consumer.close()
return total_lagSchema 管理
python
# 使用 Avro + Schema Registry 保证数据格式兼容性
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# 定义 Schema
value_schema_str = """
{
"type": "record",
"name": "IoTTelemetry",
"namespace": "com.example.iot",
"fields": [
{"name": "device_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "value", "type": "double"},
{"name": "unit", "type": "string"},
{"name": "quality", "type": "int", "default": 0}
]
}
"""
value_schema = avro.loads(value_schema_str)
producer = AvroProducer(
{
'bootstrap.servers': 'kafka:9092',
'schema.registry.url': 'http://schema-registry:8081'
},
default_value_schema=value_schema
)
producer.produce(
topic='iot-telemetry',
key='device-001',
value={
'device_id': 'device-001',
'timestamp': int(time.time() * 1000),
'value': 25.6,
'unit': 'celsius',
'quality': 0
}
)数据保留策略
bash
# 按时间保留(默认)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name iot-telemetry-raw \
--alter --add-config retention.ms=86400000 # 1天
# 按大小保留
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name iot-telemetry-raw \
--alter --add-config retention.bytes=107374182400 # 100GB
# 压缩 Topic(保留每个 Key 的最新值,适合设备状态)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name iot-device-shadow \
--alter --add-config cleanup.policy=compact,min.cleanable.dirty.ratio=0.1多数据中心复制
bash
# 使用 MirrorMaker 2 跨数据中心复制
# mm2.properties
clusters = source, target
source.bootstrap.servers = kafka-dc1:9092
target.bootstrap.servers = kafka-dc2:9092
source->target.enabled = true
source->target.topics = iot-.* # 复制所有 iot- 开头的 Topic
replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
# 启动
connect-mirror-maker.sh mm2.properties