Skip to content

Kafka 原理与生态

什么是 Kafka

Apache Kafka 是一个分布式事件流平台,最初由 LinkedIn 开发,2011 年开源。在 IoT 场景中,Kafka 通常作为 MQTT Broker 与后端数据处理系统之间的高吞吐消息总线。

IoT 数据流架构:

设备层          边缘层              云端数据管道              存储/分析层
                                                           
传感器 ──MQTT──► EMQX ──Kafka──► Stream Processing ──► TimescaleDB
PLC    ──MQTT──► Broker  Topic   (Flink/Spark)       ──► ClickHouse
网关   ──HTTP──►        ──────►  Rule Engine         ──► Elasticsearch
                                                     ──► Data Lake

核心架构

Kafka 集群架构:

Producer                    Kafka Cluster                    Consumer
                    ┌─────────────────────────────┐
                    │  Broker 1    Broker 2    Broker 3  │
IoT Gateway ──────► │  ┌────────┐ ┌────────┐ ┌────────┐ │ ──► Flink Job
MQTT Bridge ──────► │  │Topic A │ │Topic A │ │Topic A │ │ ──► Spark Job
HTTP Ingest ──────► │  │Part 0  │ │Part 1  │ │Part 2  │ │ ──► DB Writer
                    │  └────────┘ └────────┘ └────────┘ │
                    └─────────────────────────────────────┘

                              ZooKeeper / KRaft
                           (元数据管理/Leader 选举)

核心概念

Topic 与 Partition

Topic: iot-telemetry
├── Partition 0  [offset 0, 1, 2, 3, ...]  → Broker 1 (Leader)
│                                           → Broker 2 (Follower)
├── Partition 1  [offset 0, 1, 2, 3, ...]  → Broker 2 (Leader)
│                                           → Broker 3 (Follower)
└── Partition 2  [offset 0, 1, 2, 3, ...]  → Broker 3 (Leader)
                                            → Broker 1 (Follower)

关键特性:
- 分区内消息有序,分区间无序
- 每条消息有唯一 offset(分区内递增)
- 消息默认保留 7 天(可配置)
- 分区数决定最大并行消费者数

消费者组

Consumer Group: iot-processors

Topic: iot-telemetry (3 partitions)
├── Partition 0 ──► Consumer Instance 1
├── Partition 1 ──► Consumer Instance 2
└── Partition 2 ──► Consumer Instance 3

规则:
- 同一消费者组内,每个分区只被一个消费者消费
- 不同消费者组独立消费,互不影响
- 消费者数 > 分区数时,多余消费者空闲

消息键(Key)与分区路由

java
// 按设备 ID 路由到固定分区,保证同一设备消息有序
ProducerRecord<String, String> record = new ProducerRecord<>(
    "iot-telemetry",
    "device-001",           // Key:设备 ID
    jsonPayload             // Value:消息内容
);
producer.send(record);

// 自定义分区器:按站点路由
public class SitePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String siteId = extractSiteId((String) key);
        int numPartitions = cluster.partitionCountForTopic(topic);
        return Math.abs(siteId.hashCode()) % numPartitions;
    }
}

生产者配置

java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all");           // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者

// 性能配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);     // 16KB 批次
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);          // 等待 5ms 凑批
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // LZ4 压缩
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB 缓冲

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

消费者配置

java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "iot-data-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// 偏移量管理
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早开始
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);     // 手动提交

// 性能配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);         // 每次最多 500 条
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);         // 最小拉取 1KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);        // 最长等待 500ms

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("iot-telemetry", "iot-alarms"));

// 手动提交偏移量(处理完成后)
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
    }
    consumer.commitSync(); // 同步提交,确保不丢失
}

IoT 场景 Topic 设计

IoT Kafka Topic 规划:

iot-telemetry-raw        # 原始遥测数据(高吞吐,短保留)
  分区数:24,副本数:2,保留:1天

iot-telemetry-processed  # 处理后数据(中等吞吐)
  分区数:12,副本数:3,保留:7天

iot-alarms               # 告警事件(低吞吐,长保留)
  分区数:6,副本数:3,保留:30天

iot-commands             # 控制指令(低吞吐,精确一次)
  分区数:6,副本数:3,保留:7天,启用事务

iot-device-shadow        # 设备影子/状态(压缩 Topic)
  分区数:12,副本数:3,cleanup.policy=compact

Kafka Connect 集成

json
// MQTT Source Connector 配置(EMQX → Kafka)
{
  "name": "mqtt-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "mqtt.server.uri": "tcp://emqx:1883",
    "mqtt.topics": "ess/+/bms/+/data,ess/+/pcs/+/data",
    "kafka.topic": "iot-telemetry-raw",
    "mqtt.qos": "1",
    "mqtt.username": "kafka-bridge",
    "mqtt.password": "secret",
    "tasks.max": "3"
  }
}
json
// TimescaleDB Sink Connector 配置
{
  "name": "timescaledb-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://timescaledb:5432/iotdb",
    "connection.user": "kafka",
    "connection.password": "secret",
    "topics": "iot-telemetry-processed",
    "insert.mode": "insert",
    "auto.create": "true",
    "batch.size": "1000"
  }
}

Kafka Streams 实时处理

java
// IoT 数据实时聚合:每分钟计算设备平均值
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> rawData = builder.stream("iot-telemetry-raw");

rawData
    .mapValues(value -> parseJson(value))
    .filter((key, data) -> data.getQuality() == 0)  // 过滤坏数据
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .aggregate(
        () -> new AggregateState(),
        (key, data, state) -> state.add(data),
        Materialized.with(Serdes.String(), aggregateSerde)
    )
    .toStream()
    .mapValues(state -> state.toAverage())
    .to("iot-telemetry-1min-avg");

KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();

生态工具

管理工具:
├── Kafka UI          → Web 管理界面(推荐)
├── AKHQ              → 功能完整的 Web UI
├── Confluent Control Center → 企业级监控
└── kafka-topics.sh   → 命令行管理

监控:
├── JMX Exporter + Prometheus + Grafana
├── Confluent Metrics Reporter
└── Burrow(消费者 Lag 监控)

Schema 管理:
├── Confluent Schema Registry
├── Avro / Protobuf / JSON Schema
└── 版本兼容性管理

流处理:
├── Apache Flink      → 低延迟,状态管理强
├── Apache Spark Streaming → 批流一体
├── Kafka Streams     → 轻量,无需额外集群
└── ksqlDB            → SQL 流处理

褚成志的IoT笔记