Kafka开发
Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流处理应用。在 Java 中使用 Kafka,通常涉及生产者、消费者、主题配置和集群管理等方面。
1. 安装和配置 Kafka
- 下载 Kafka 并解压。
- 启动 Zookeeper(Kafka 的依赖服务):
- 启动 Kafka Broker:
2. 引入 Maven 依赖
在你的 Java 项目的 pom.xml 中添加以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version> <!-- 选择 Kafka 对应的版本 -->
</dependency>
3. 创建 Kafka 生产者
生产者负责将消息发送到 Kafka 主题。
代码示例:生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置生产者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka Broker 地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent: %s | Partition: %d | Offset: %d%n",
record.value(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
// 关闭生产者
producer.close();
}
}
4. 创建 Kafka 消费者
消费者从 Kafka 主题读取消息。
代码示例:消费者
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费
// 创建 Kafka 消费者
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 拉取消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: %s | Partition: %d | Offset: %d%n",
record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close();
}
}
}
5. 流处理示例
Kafka Streams 是一个强大的流处理库,用于实时处理 Kafka 数据流。
Kafka Streams 示例
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class KafkaStreamsExample {
public static void main(String[] args) {
// 配置流处理属性
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
// 定义流拓扑
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> sourceStream = builder.stream("input-topic");
sourceStream.mapValues(value -> value.toUpperCase())
.to("output-topic");
// 启动流处理应用
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
6. 常见优化建议
- 分区与副本: 根据需求设置分区和副本,提升吞吐量和容错能力。
- 幂等性生产者: 设置
enable.idempotence=true确保消息的可靠投递。 - 消费并发: 配置多个消费者实例,提高消费性能。
- 监控: 使用 Kafka Manager 或 Prometheus 监控集群运行状态。
Kafka碎碎念 文章被收录于专栏
Kafka的一些碎碎念,哈哈哈哈哈
查看11道真题和解析
