Kafka 基础篇
基本操作命令
启动Kafka
# 需要指定配置文件
kafka-server-start.sh -daemon config/server.properties
创建Topic
kafka-topics.sh --create --zookeeper 21.6.162.167:2181 --replication-factor 1 --partitions 1 --topic test
列出所有Topic
kafka-topics.sh --list --zookeeper 21.6.162.167:2181
查看topic的分区信息
kafka-topics.sh --describe --zookeeper 21.6.162.167:2181 --topic test
生产消息
kafka-console-producer.sh --broker-list 21.6.162.167:9093 --topic test
消费消息
1.从最后一条消息的偏移量+1开始消费
kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9093 --topic test
2.从头开始消费
kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9093 --from-beginning --topic test
单播和多播
单播
单播指的是一个消息只能被单个消费者组接收并消费。在这种情况下,该队列仅被一个消费组group所绑定。
kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9093 --group 1 --from-beginning --topic test
多播
多播指的是一个消息可能会被多个消费者组接收并消费。该队列上绑定了多个消费组,当消息到达后,Broker会将消息同时分发给这些组。
列出所有消费组
kafka-consumer-groups.sh --bootstrap-server 21.6.162.167:9093 --list
查看消费组的详细信息
kafka-consumer-groups.sh --bootstrap-server 21.6.162.167:9093 --describe --group group3
默认Topic _consumer-offsets的作用
kafka通过该默认Topic来确保消息的单播和多播产生混乱,确保消息的正常消费。每个Consumer在消费完消息后,会向Broker提交自己消费的最后一条消息的offset,Broker会将这个offset放入到_consumer-offsets队列中,来保存该消费者所在消费组的消息消费情况。为了提高该Topic的并发性,kafka设置了50个分区给该Topic,kafka通过公式:$hash(groupNum)%partitionNum$来计算offset消息应该放入到哪个分区中。kafka会定期清理过期的offset消息,只保留最新的offset消息。
Kafka集群搭建
搭建kafka集群非常简单,kafka集群的搭建需借助zk。只需要在启动kafka的时候指定不同的配置文件,并且在配置文件中配置不同的broker-id,然后指明zk的地址即可。
配置文件示例:
broker.id=0
listeners=PLAINTEXT://21.6.162.167:9093
log.dirs=/tmp/kafka-logs
zookeeper.connect=21.6.162.167:2181
注意:在同一台主机上部署kafka集群需要配置不同的log.dirs
。
操作命令
向集群发消息
kafka-console-producer.sh --broker-list 21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.:9994 --topic topic-2-3
消费消息
kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994 --topic topic-2-3 --group 1
Replicas 集合和 Isr集合
副本是为了保证topic分区的高可用。
集群信息中的replicas表示所有的分区副本的broker节点集合。Isr集合中保存了所有可以同步数据并且已完成同步的副本broker节点集合。当分区的Leader挂掉时,集群会从Isr中选择新的Leader。
消息的顺序性
kafka只能保证消息在partition中的局部顺序性,无法保证整个topic中消费的顺序性。并且一个partition只能被相同消费组中的一个消费者消费,但可以被不同消费组的多个消费者消费。
单个消费者时可以消费多个partition的。当partition数量等于消费者数量时,并发度最高。
Broker、Topic、分区、replication等概念
Broker
Broker负责分发消息到对应的Topic中,有点类似于RabbitMQ中的Exchange。Broker可以被集群化部署,从而提升系统吞吐量。
Topic
每一个主题即为一个队列,并且每个Topic可以有多个分区和多个副本。
分区
分区存在的意义是为了将消息分而治之,从而可以让消息更快投递成功以及更快的消费消息。每一个Topic可以有多个分区,每一个分区拥有队列的一部份数据,并且这些分区会均匀地分布在不同的Broker中。
replication
在创建Topic时,可以配置副本数。kafka会为每一个分区创建副本,副本有两种类型,分别是Leader和Follower,当Leader所在的Broker挂掉后,kafka集群会自动选举出新的Leader。只有Leader可以进行数据的读写,Follower只同步数据,不进行读写。
Java API 收发消息
依赖包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.2</version>
</dependency>
生产者 Producer
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Date;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Scanner scn = new Scanner(System.in);
Properties properties = new Properties();
// 配置kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994");
// 配置用来计算分区号的键的序列化方法
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 配置消息的序列化方法
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer<>(properties);
String topic = "topic-2-3";
while (scn.hasNext()) {
String s = scn.next();
System.out.println("发送消息Timestamp:" + (new Date()).getTime());
stringStringKafkaProducer.send(new ProducerRecord<>(topic, s), new Callback() {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("消息发送成功");
System.out.println(recordMetadata.timestamp());
}
});
} }
}
生产者ACK配置
有三种可选配置,分别为:
- 0:消息到达Broker后,直接返回ACK,这种方式下效率是最高的。
- 1:当Leader写入本地log成功后,Broker发送ACK确认消息后,才可以发送下一条消息。这种方式下,性能和安全是均衡的。在特殊情况下,会导致消息丢失,比如在Leader写入消息后,Leader挂掉了。
- -1或all:需要保证
min.insync.replices
(默认为1,推荐配置大于2)个副本写入后,才可以发送下一条消息。这是最强的数据保证,一般只有对数据可靠性要求非常高的场景才会使用,比如金融。
官网:Apache Kafka
重试配置
虽然重试保证了消息的可靠送达,但是重试也可能导致消息在接收方被重复接收。
// 配置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
// 配置重试间隔
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
缓冲区配置
kafka为了提高发送的效率,会先将消息放入到本地缓冲区中,由一个单独的线程从本地缓冲中批量去取消息,发送到Broker。
// 发送消息时的本地缓冲区大小,默认32MB
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 批量发送消息的大小,默认16KB
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// linger:继续存留 如果缓冲区不够一个batch,则等待linger_ms_config毫秒后将被发送出去
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
消费者
Java客户端实现
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
String groupId = "group-2";
Properties properties = new Properties();
// 配置集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994");
// 配置键和值的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
String topic = "topic-2-3";
// 实例化消费者
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)){
// 订阅topic
kafkaConsumer.subscribe(Collections.singleton(topic));
while(true) {
ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.of(2, ChronoUnit.SECONDS));
for (ConsumerRecord<String, String> record : poll) {
System.out.println(String.format("partition: %d offset: %d key: %s value: %s",record.partition(), record.offset(), record.key(), record.value()));
}
}
}
}
}
消息提交配置
Kafka默认自动提交offset。
自动提交
// 自动提交配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
当消费者将消息poll下来后,如果开启了自动提交,则会自动commit最后一条消息的offset到_comsumer_offsets队列中。默认自动提交间隔为1S。
在这种情况下,自动提交并不意味着消息已经被处理了。如果消息被poll下来后,消费者挂掉了,则可能会导致消息丢失。
手动提交
//手动提交配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交又分为手动同步提交与手动异步提交。
手动同步提交
if (poll.count() > 0) {
kafkaConsumer.commitSync();
}
手动异步提交
if (poll.count() > 0) {
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
System.out.println("Commit failed " + map);
System.out.println(e.getMessage());
} }
});
}
其他配置
- 心跳间隔配置
- 会话超时时间配置:通过心跳判断消费者是否存活。
- 最大拉取记录数:
- 最大拉取时间间隔
// Consumer发送心跳包的间隔时间,单位 毫秒
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// Consumer会话的超时时间,如果Broker超过该设置时间未收到Consumer发送的心跳包,则Broker会剔除该Consumer,并进行Re-balance
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
// 配置在一次拉取过程中,最多可以拉取的消息数。如果在长轮询期间,消息数不足该值,则会进行等待,等到超时时间到或者消息数达到该值时,本次长轮询才会结束
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 两次Poll的最大时间间隔,如果Consumer两次Poll的时间间隔超过该值,则Broker会认为该Consumer的消费能力太弱,会将该Consumer剔除,并进行Re-balance
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
消费者的健康状态检查
通过配置心跳间隔和会话超时时间来对消费者的健康状态进行检查,如果发现消费者不可用,则会将消费者踢出消费组,并进行re-balance。
指定消费分区
需要注意的是,指定消费分区时,需要注掉订阅方法调用。
kafkaConsumer.assign(Collections.singleton(new TopicPartition(topic, 1)));
// 订阅topic
//kafkaConsumer.subscribe(Collections.singleton(topic));
当多个消费者同时订阅同一个分区时,并不会发送冲突,而且这几个消费者会同时对该分区的消息进行消费。
消息回溯消费
kafkaConsumer.assign(Collections.singleton(new TopicPartition(topic, 1)));
kafkaConsumer.seekToBeginning(Collections.singleton(new TopicPartition(topic, 1)));
通过seekToBeginning
函数可以让消费者从头开始消费。
指定偏移量消费
kafkaConsumer.assign(Collections.singleton(new TopicPartition(topic, 1)));
kafkaConsumer.seek(new TopicPartition(topic, 1), 10);
通过seek
函数让消费者从指定偏移量开始消费。
指定时间开始消费
Long consumerTimePoint = new Date().getTime() - 60*60*1000;
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
Map<TopicPartition, Long> timeMap = new HashMap<>();
for (PartitionInfo pi : partitionInfoList) {
timeMap.put(new TopicPartition(topic, pi.partition()), consumerTimePoint);
}
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = kafkaConsumer.offsetsForTimes(timeMap);
// 指定要消费的分区
kafkaConsumer.assign(offsetsForTimes.keySet());
// 指定每个分区开始消费偏移量
offsetsForTimes.forEach((topicPartition, offsetAndTimestamp) -> {
if (offsetAndTimestamp == null) {
return;
}
long offset = offsetAndTimestamp.offset();
kafkaConsumer.seek(topicPartition, offset);
});
新消费组的偏移量
默认情况下,新消费组只能消费自己启动后生产者发送的消息。可以通过如下参数进行配置。
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
earliest
:新消费组将从头开始消费,下次启动接着消费。lastest
:默认配置,消费自己启动后收到的消息。
Spring Boot Kafka
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.参数配置
spring:
kafka:
bootstrap-servers: 21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994
producer:
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 500
listener:
ack-mode: manual_immediate
参数配置与JavaAPI是一样的,使用配置文件更方便。
3.编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ConsumerService {
@KafkaListener(topics = "topic-2-3", groupId = "default-group-2")
public void listenerGroup(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
String v = record.value();
System.out.println("收到消息" + record.toString());
acknowledgment.acknowledge();
}
}
4.生产者
private final KafkaTemplate<String, String> kafkaTemplate;