Skip to content

Kafka 最佳实践

分区策略设计

分区数量计算

分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者实例数)

单分区写入吞吐:约 10-20 MB/s(取决于消息大小和硬件)
单分区读取吞吐:约 50-100 MB/s

示例:
目标写入:200 MB/s
单分区写入:10 MB/s
最小分区数:200 / 10 = 20 个分区

建议:分区数设为消费者实例数的整数倍,便于均匀分配

IoT 场景分区键设计

java
// 策略1:按设备 ID 分区(保证单设备消息有序)
String partitionKey = deviceId;

// 策略2:按站点+设备类型分区(平衡负载)
String partitionKey = siteId + ":" + deviceType;

// 策略3:时间分桶(适合时序分析)
String partitionKey = deviceId + ":" + (timestamp / 60000); // 按分钟

// 注意:避免热点分区
// 错误:所有消息用同一个 key → 全部路由到同一分区
// 正确:key 分布均匀,避免某些设备数据量远大于其他

可靠性配置

生产者可靠性

java
// 最高可靠性配置(适合控制指令、计费数据)
Properties reliableProps = new Properties();
reliableProps.put(ProducerConfig.ACKS_CONFIG, "all");
reliableProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
reliableProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
reliableProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
reliableProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2分钟

// 高吞吐配置(适合遥测数据,允许少量丢失)
Properties throughputProps = new Properties();
throughputProps.put(ProducerConfig.ACKS_CONFIG, "1");  // 只等 Leader 确认
throughputProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);
throughputProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);  // 64KB
throughputProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

事务性生产者(精确一次语义)

java
// 适用于:控制指令、需要原子性的多 Topic 写入
Properties txProps = new Properties();
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "iot-control-producer-1");
txProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

KafkaProducer<String, String> txProducer = new KafkaProducer<>(txProps);
txProducer.initTransactions();

try {
    txProducer.beginTransaction();

    // 原子写入多个 Topic
    txProducer.send(new ProducerRecord<>("iot-commands", deviceId, command));
    txProducer.send(new ProducerRecord<>("iot-audit-log", deviceId, auditEntry));

    txProducer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
    txProducer.close();
    throw e;
} catch (KafkaException e) {
    txProducer.abortTransaction();
    throw e;
}

消费者可靠性

java
// 手动提交 + 至少一次处理
public class ReliableConsumer {
    private final KafkaConsumer<String, String> consumer;

    public void run() {
        consumer.subscribe(Collections.singletonList("iot-telemetry"));

        while (running) {
            ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                try {
                    processRecord(record);
                } catch (RetryableException e) {
                    // 可重试异常:不提交,下次重新处理
                    log.warn("Retryable error, will retry: {}", e.getMessage());
                    consumer.seek(
                        new TopicPartition(record.topic(), record.partition()),
                        record.offset()
                    );
                    break;
                } catch (NonRetryableException e) {
                    // 不可重试:发送到死信队列
                    sendToDeadLetterQueue(record, e);
                }
            }

            // 处理完一批后提交
            try {
                consumer.commitSync();
            } catch (CommitFailedException e) {
                log.error("Commit failed", e);
            }
        }
    }
}

性能调优

Broker 配置优化

properties
# server.properties

# 网络线程
num.network.threads=8
num.io.threads=16

# 日志刷盘策略(性能优先,依赖副本保证可靠性)
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 日志保留
log.retention.hours=168        # 7天
log.retention.bytes=107374182400  # 100GB per partition
log.segment.bytes=1073741824   # 1GB per segment

# 副本同步
replica.lag.time.max.ms=30000
replica.fetch.max.bytes=1048576

# 压缩
compression.type=producer      # 保留生产者压缩格式

# 消息大小
message.max.bytes=10485760     # 10MB

操作系统优化

bash
# 增大文件描述符限制
echo "* soft nofile 100000" >> /etc/security/limits.conf
echo "* hard nofile 100000" >> /etc/security/limits.conf

# 调整虚拟内存
sysctl -w vm.swappiness=1
sysctl -w vm.dirty_ratio=80
sysctl -w vm.dirty_background_ratio=5

# 网络优化
sysctl -w net.core.rmem_max=134217728
sysctl -w net.core.wmem_max=134217728
sysctl -w net.ipv4.tcp_rmem="4096 65536 134217728"
sysctl -w net.ipv4.tcp_wmem="4096 65536 134217728"

# 使用 XFS 文件系统(比 ext4 更适合 Kafka)
mkfs.xfs -f /dev/sdb
mount -o noatime,nodiratime /dev/sdb /kafka/data

消费者 Lag 监控

bash
# 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group iot-data-processor --describe

# 输出示例:
# TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# iot-telemetry      0          12345           12400           55
# iot-telemetry      1          23456           23500           44
# iot-telemetry      2          34567           34600           33

# 告警阈值建议:
# 正常:Lag < 1000
# 警告:Lag > 10000
# 严重:Lag > 100000(消费者可能已停止)
python
# Python 监控脚本
from kafka.admin import KafkaAdminClient
from kafka import KafkaConsumer, TopicPartition

def get_consumer_lag(bootstrap_servers, group_id, topic):
    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=group_id + "-monitor",  # 不影响原消费者组
        enable_auto_commit=False
    )

    partitions = consumer.partitions_for_topic(topic)
    tps = [TopicPartition(topic, p) for p in partitions]

    # 获取当前消费位置
    consumer.assign(tps)
    consumer.seek_to_end(*tps)
    end_offsets = {tp: consumer.position(tp) for tp in tps}

    # 获取消费者组提交的偏移量
    committed = {tp: consumer.committed(tp) or 0 for tp in tps}

    total_lag = sum(end_offsets[tp] - committed[tp] for tp in tps)
    consumer.close()
    return total_lag

Schema 管理

python
# 使用 Avro + Schema Registry 保证数据格式兼容性
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

# 定义 Schema
value_schema_str = """
{
  "type": "record",
  "name": "IoTTelemetry",
  "namespace": "com.example.iot",
  "fields": [
    {"name": "device_id", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "value", "type": "double"},
    {"name": "unit", "type": "string"},
    {"name": "quality", "type": "int", "default": 0}
  ]
}
"""

value_schema = avro.loads(value_schema_str)

producer = AvroProducer(
    {
        'bootstrap.servers': 'kafka:9092',
        'schema.registry.url': 'http://schema-registry:8081'
    },
    default_value_schema=value_schema
)

producer.produce(
    topic='iot-telemetry',
    key='device-001',
    value={
        'device_id': 'device-001',
        'timestamp': int(time.time() * 1000),
        'value': 25.6,
        'unit': 'celsius',
        'quality': 0
    }
)

数据保留策略

bash
# 按时间保留(默认)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name iot-telemetry-raw \
  --alter --add-config retention.ms=86400000  # 1天

# 按大小保留
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name iot-telemetry-raw \
  --alter --add-config retention.bytes=107374182400  # 100GB

# 压缩 Topic(保留每个 Key 的最新值,适合设备状态)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name iot-device-shadow \
  --alter --add-config cleanup.policy=compact,min.cleanable.dirty.ratio=0.1

多数据中心复制

bash
# 使用 MirrorMaker 2 跨数据中心复制
# mm2.properties
clusters = source, target
source.bootstrap.servers = kafka-dc1:9092
target.bootstrap.servers = kafka-dc2:9092

source->target.enabled = true
source->target.topics = iot-.*  # 复制所有 iot- 开头的 Topic

replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3

# 启动
connect-mirror-maker.sh mm2.properties

褚成志的IoT笔记