Kafka 原理与生态
什么是 Kafka
Apache Kafka 是一个分布式事件流平台,最初由 LinkedIn 开发,2011 年开源。在 IoT 场景中,Kafka 通常作为 MQTT Broker 与后端数据处理系统之间的高吞吐消息总线。
IoT 数据流架构:
设备层 边缘层 云端数据管道 存储/分析层
传感器 ──MQTT──► EMQX ──Kafka──► Stream Processing ──► TimescaleDB
PLC ──MQTT──► Broker Topic (Flink/Spark) ──► ClickHouse
网关 ──HTTP──► ──────► Rule Engine ──► Elasticsearch
──► Data Lake核心架构
Kafka 集群架构:
Producer Kafka Cluster Consumer
┌─────────────────────────────┐
│ Broker 1 Broker 2 Broker 3 │
IoT Gateway ──────► │ ┌────────┐ ┌────────┐ ┌────────┐ │ ──► Flink Job
MQTT Bridge ──────► │ │Topic A │ │Topic A │ │Topic A │ │ ──► Spark Job
HTTP Ingest ──────► │ │Part 0 │ │Part 1 │ │Part 2 │ │ ──► DB Writer
│ └────────┘ └────────┘ └────────┘ │
└─────────────────────────────────────┘
│
ZooKeeper / KRaft
(元数据管理/Leader 选举)核心概念
Topic 与 Partition
Topic: iot-telemetry
├── Partition 0 [offset 0, 1, 2, 3, ...] → Broker 1 (Leader)
│ → Broker 2 (Follower)
├── Partition 1 [offset 0, 1, 2, 3, ...] → Broker 2 (Leader)
│ → Broker 3 (Follower)
└── Partition 2 [offset 0, 1, 2, 3, ...] → Broker 3 (Leader)
→ Broker 1 (Follower)
关键特性:
- 分区内消息有序,分区间无序
- 每条消息有唯一 offset(分区内递增)
- 消息默认保留 7 天(可配置)
- 分区数决定最大并行消费者数消费者组
Consumer Group: iot-processors
Topic: iot-telemetry (3 partitions)
├── Partition 0 ──► Consumer Instance 1
├── Partition 1 ──► Consumer Instance 2
└── Partition 2 ──► Consumer Instance 3
规则:
- 同一消费者组内,每个分区只被一个消费者消费
- 不同消费者组独立消费,互不影响
- 消费者数 > 分区数时,多余消费者空闲消息键(Key)与分区路由
java
// 按设备 ID 路由到固定分区,保证同一设备消息有序
ProducerRecord<String, String> record = new ProducerRecord<>(
"iot-telemetry",
"device-001", // Key:设备 ID
jsonPayload // Value:消息内容
);
producer.send(record);
// 自定义分区器:按站点路由
public class SitePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String siteId = extractSiteId((String) key);
int numPartitions = cluster.partitionCountForTopic(topic);
return Math.abs(siteId.hashCode()) % numPartitions;
}
}生产者配置
java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者
// 性能配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB 批次
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 等待 5ms 凑批
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // LZ4 压缩
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB 缓冲
KafkaProducer<String, String> producer = new KafkaProducer<>(props);消费者配置
java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "iot-data-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 偏移量管理
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早开始
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
// 性能配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次最多 500 条
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 最小拉取 1KB
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最长等待 500ms
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("iot-telemetry", "iot-alarms"));
// 手动提交偏移量(处理完成后)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync(); // 同步提交,确保不丢失
}IoT 场景 Topic 设计
IoT Kafka Topic 规划:
iot-telemetry-raw # 原始遥测数据(高吞吐,短保留)
分区数:24,副本数:2,保留:1天
iot-telemetry-processed # 处理后数据(中等吞吐)
分区数:12,副本数:3,保留:7天
iot-alarms # 告警事件(低吞吐,长保留)
分区数:6,副本数:3,保留:30天
iot-commands # 控制指令(低吞吐,精确一次)
分区数:6,副本数:3,保留:7天,启用事务
iot-device-shadow # 设备影子/状态(压缩 Topic)
分区数:12,副本数:3,cleanup.policy=compactKafka Connect 集成
json
// MQTT Source Connector 配置(EMQX → Kafka)
{
"name": "mqtt-source-connector",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"mqtt.server.uri": "tcp://emqx:1883",
"mqtt.topics": "ess/+/bms/+/data,ess/+/pcs/+/data",
"kafka.topic": "iot-telemetry-raw",
"mqtt.qos": "1",
"mqtt.username": "kafka-bridge",
"mqtt.password": "secret",
"tasks.max": "3"
}
}json
// TimescaleDB Sink Connector 配置
{
"name": "timescaledb-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://timescaledb:5432/iotdb",
"connection.user": "kafka",
"connection.password": "secret",
"topics": "iot-telemetry-processed",
"insert.mode": "insert",
"auto.create": "true",
"batch.size": "1000"
}
}Kafka Streams 实时处理
java
// IoT 数据实时聚合:每分钟计算设备平均值
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> rawData = builder.stream("iot-telemetry-raw");
rawData
.mapValues(value -> parseJson(value))
.filter((key, data) -> data.getQuality() == 0) // 过滤坏数据
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.aggregate(
() -> new AggregateState(),
(key, data, state) -> state.add(data),
Materialized.with(Serdes.String(), aggregateSerde)
)
.toStream()
.mapValues(state -> state.toAverage())
.to("iot-telemetry-1min-avg");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();生态工具
管理工具:
├── Kafka UI → Web 管理界面(推荐)
├── AKHQ → 功能完整的 Web UI
├── Confluent Control Center → 企业级监控
└── kafka-topics.sh → 命令行管理
监控:
├── JMX Exporter + Prometheus + Grafana
├── Confluent Metrics Reporter
└── Burrow(消费者 Lag 监控)
Schema 管理:
├── Confluent Schema Registry
├── Avro / Protobuf / JSON Schema
└── 版本兼容性管理
流处理:
├── Apache Flink → 低延迟,状态管理强
├── Apache Spark Streaming → 批流一体
├── Kafka Streams → 轻量,无需额外集群
└── ksqlDB → SQL 流处理