Kafka 基础篇

37

基本操作命令

启动Kafka

# 需要指定配置文件
kafka-server-start.sh -daemon config/server.properties

创建Topic

kafka-topics.sh --create --zookeeper 21.6.162.167:2181 --replication-factor 1 --partitions 1 --topic test

列出所有Topic

kafka-topics.sh --list --zookeeper 21.6.162.167:2181

查看topic的分区信息

kafka-topics.sh --describe --zookeeper 21.6.162.167:2181 --topic test

生产消息

kafka-console-producer.sh --broker-list 21.6.162.167:9093 --topic test

消费消息
1.从最后一条消息的偏移量+1开始消费

kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9093 --topic test

2.从头开始消费

kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9093 --from-beginning --topic test

单播和多播

单播
单播指的是一个消息只能被单个消费者组接收并消费。在这种情况下,该队列仅被一个消费组group所绑定。

kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9093 --group 1 --from-beginning --topic test

多播
多播指的是一个消息可能会被多个消费者组接收并消费。该队列上绑定了多个消费组,当消息到达后,Broker会将消息同时分发给这些组。

列出所有消费组

kafka-consumer-groups.sh --bootstrap-server 21.6.162.167:9093 --list

查看消费组的详细信息

kafka-consumer-groups.sh --bootstrap-server 21.6.162.167:9093 --describe --group group3

默认Topic _consumer-offsets的作用

kafka通过该默认Topic来确保消息的单播和多播产生混乱,确保消息的正常消费。每个Consumer在消费完消息后,会向Broker提交自己消费的最后一条消息的offset,Broker会将这个offset放入到_consumer-offsets队列中,来保存该消费者所在消费组的消息消费情况。为了提高该Topic的并发性,kafka设置了50个分区给该Topic,kafka通过公式:$hash(groupNum)%partitionNum$来计算offset消息应该放入到哪个分区中。kafka会定期清理过期的offset消息,只保留最新的offset消息。

Kafka集群搭建

搭建kafka集群非常简单,kafka集群的搭建需借助zk。只需要在启动kafka的时候指定不同的配置文件,并且在配置文件中配置不同的broker-id,然后指明zk的地址即可。
配置文件示例:

broker.id=0
listeners=PLAINTEXT://21.6.162.167:9093
log.dirs=/tmp/kafka-logs
zookeeper.connect=21.6.162.167:2181

注意:在同一台主机上部署kafka集群需要配置不同的log.dirs

操作命令

向集群发消息

kafka-console-producer.sh --broker-list 21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.:9994 --topic topic-2-3

消费消息

kafka-console-consumer.sh --bootstrap-server 21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994 --topic topic-2-3 --group 1

Replicas 集合和 Isr集合

副本是为了保证topic分区的高可用。
集群信息中的replicas表示所有的分区副本的broker节点集合。Isr集合中保存了所有可以同步数据并且已完成同步的副本broker节点集合。当分区的Leader挂掉时,集群会从Isr中选择新的Leader。

消息的顺序性

kafka只能保证消息在partition中的局部顺序性,无法保证整个topic中消费的顺序性。并且一个partition只能被相同消费组中的一个消费者消费,但可以被不同消费组的多个消费者消费。

单个消费者时可以消费多个partition的。当partition数量等于消费者数量时,并发度最高。

Broker、Topic、分区、replication等概念

Broker
Broker负责分发消息到对应的Topic中,有点类似于RabbitMQ中的Exchange。Broker可以被集群化部署,从而提升系统吞吐量。

Topic
每一个主题即为一个队列,并且每个Topic可以有多个分区和多个副本。

分区
分区存在的意义是为了将消息分而治之,从而可以让消息更快投递成功以及更快的消费消息。每一个Topic可以有多个分区,每一个分区拥有队列的一部份数据,并且这些分区会均匀地分布在不同的Broker中。

replication
在创建Topic时,可以配置副本数。kafka会为每一个分区创建副本,副本有两种类型,分别是Leader和Follower,当Leader所在的Broker挂掉后,kafka集群会自动选举出新的Leader。只有Leader可以进行数据的读写,Follower只同步数据,不进行读写。

Java API 收发消息

依赖包

<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>3.0.2</version>  
</dependency>

生产者 Producer

import org.apache.kafka.clients.producer.*;  
import org.apache.kafka.common.serialization.StringSerializer;  
  
import java.util.Date;  
import java.util.Properties;  
import java.util.Scanner;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.Future;  
  
public class Main {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        Scanner scn = new Scanner(System.in);  
        Properties properties = new Properties();  
        // 配置kafka集群  
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994");  
        // 配置用来计算分区号的键的序列化方法  
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
        // 配置消息的序列化方法  
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  
        KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer<>(properties);  
        String topic = "topic-2-3";  
        while (scn.hasNext()) {  
            String s = scn.next();  
            System.out.println("发送消息Timestamp:" + (new Date()).getTime());  
            stringStringKafkaProducer.send(new ProducerRecord<>(topic, s), new Callback() {  
                @Override                public void onCompletion(RecordMetadata recordMetadata, Exception e) {  
                    System.out.println("消息发送成功");  
                    System.out.println(recordMetadata.timestamp());  
                }  
            });  
        }    }  
}
生产者ACK配置

有三种可选配置,分别为:

  • 0:消息到达Broker后,直接返回ACK,这种方式下效率是最高的。
  • 1:当Leader写入本地log成功后,Broker发送ACK确认消息后,才可以发送下一条消息。这种方式下,性能和安全是均衡的。在特殊情况下,会导致消息丢失,比如在Leader写入消息后,Leader挂掉了。
  • -1或all:需要保证min.insync.replices(默认为1,推荐配置大于2)个副本写入后,才可以发送下一条消息。这是最强的数据保证,一般只有对数据可靠性要求非常高的场景才会使用,比如金融。
    官网:Apache Kafka
重试配置

虽然重试保证了消息的可靠送达,但是重试也可能导致消息在接收方被重复接收。

// 配置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 3);  
// 配置重试间隔
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
缓冲区配置

kafka为了提高发送的效率,会先将消息放入到本地缓冲区中,由一个单独的线程从本地缓冲中批量去取消息,发送到Broker。

// 发送消息时的本地缓冲区大小,默认32MB  
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);  
// 批量发送消息的大小,默认16KB  
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  
// linger:继续存留 如果缓冲区不够一个batch,则等待linger_ms_config毫秒后将被发送出去  
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);

消费者

Java客户端实现
import org.apache.kafka.clients.consumer.ConsumerConfig;  
import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
import org.apache.kafka.clients.producer.KafkaProducer;  
import org.apache.kafka.clients.producer.ProducerConfig;  
import org.apache.kafka.common.serialization.StringDeserializer;  
import org.apache.kafka.common.serialization.StringSerializer;  
  
import java.time.Duration;  
import java.time.temporal.ChronoUnit;  
import java.util.Collections;  
import java.util.Properties;  
import java.util.concurrent.TimeUnit;  
  
public class Main {  
    public static void main(String[] args) {  
        String groupId = "group-2";  
  
        Properties properties = new Properties();  
        // 配置集群地址  
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994");  
        // 配置键和值的反序列化器  
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
        // 配置组ID  
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
        String topic = "topic-2-3";  
        // 实例化消费者  
        try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)){  
            // 订阅topic  
            kafkaConsumer.subscribe(Collections.singleton(topic));  
            while(true) {  
                ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.of(2, ChronoUnit.SECONDS));  
                for (ConsumerRecord<String, String> record : poll) {  
                    System.out.println(String.format("partition: %d offset: %d key: %s value: %s",record.partition(), record.offset(), record.key(), record.value()));  
                }  
            }  
        }  
    }  
}
消息提交配置

Kafka默认自动提交offset。
自动提交

// 自动提交配置  
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");  
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

当消费者将消息poll下来后,如果开启了自动提交,则会自动commit最后一条消息的offset到_comsumer_offsets队列中。默认自动提交间隔为1S。
在这种情况下,自动提交并不意味着消息已经被处理了。如果消息被poll下来后,消费者挂掉了,则可能会导致消息丢失。

手动提交

//手动提交配置  
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");  

手动提交又分为手动同步提交与手动异步提交。
手动同步提交

if (poll.count() > 0) {  
    kafkaConsumer.commitSync();  
}

手动异步提交

if (poll.count() > 0) {  
    kafkaConsumer.commitAsync(new OffsetCommitCallback() {  
        @Override  
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {  
            if (e != null) {  
                System.out.println("Commit failed " + map);  
                System.out.println(e.getMessage());  
            }        }  
    });  
}
其他配置
  • 心跳间隔配置
  • 会话超时时间配置:通过心跳判断消费者是否存活。
  • 最大拉取记录数:
  • 最大拉取时间间隔
// Consumer发送心跳包的间隔时间,单位 毫秒  
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);  
// Consumer会话的超时时间,如果Broker超过该设置时间未收到Consumer发送的心跳包,则Broker会剔除该Consumer,并进行Re-balance  
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);  
// 配置在一次拉取过程中,最多可以拉取的消息数。如果在长轮询期间,消息数不足该值,则会进行等待,等到超时时间到或者消息数达到该值时,本次长轮询才会结束  
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  
// 两次Poll的最大时间间隔,如果Consumer两次Poll的时间间隔超过该值,则Broker会认为该Consumer的消费能力太弱,会将该Consumer剔除,并进行Re-balance  
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);

消费者的健康状态检查
通过配置心跳间隔会话超时时间来对消费者的健康状态进行检查,如果发现消费者不可用,则会将消费者踢出消费组,并进行re-balance。

指定消费分区

需要注意的是,指定消费分区时,需要注掉订阅方法调用。

kafkaConsumer.assign(Collections.singleton(new TopicPartition(topic, 1)));  
// 订阅topic  
//kafkaConsumer.subscribe(Collections.singleton(topic));

当多个消费者同时订阅同一个分区时,并不会发送冲突,而且这几个消费者会同时对该分区的消息进行消费。

消息回溯消费
kafkaConsumer.assign(Collections.singleton(new TopicPartition(topic, 1)));  
kafkaConsumer.seekToBeginning(Collections.singleton(new TopicPartition(topic, 1)));

通过seekToBeginning函数可以让消费者从头开始消费。

指定偏移量消费
kafkaConsumer.assign(Collections.singleton(new TopicPartition(topic, 1)));  
kafkaConsumer.seek(new TopicPartition(topic, 1), 10);

通过seek函数让消费者从指定偏移量开始消费。

指定时间开始消费
Long consumerTimePoint = new Date().getTime() - 60*60*1000;  
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);  
Map<TopicPartition, Long> timeMap = new HashMap<>();  
for (PartitionInfo pi : partitionInfoList) {  
    timeMap.put(new TopicPartition(topic, pi.partition()), consumerTimePoint);  
}  
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = kafkaConsumer.offsetsForTimes(timeMap);  
// 指定要消费的分区  
kafkaConsumer.assign(offsetsForTimes.keySet());  
// 指定每个分区开始消费偏移量  
offsetsForTimes.forEach((topicPartition, offsetAndTimestamp) -> {  
    if (offsetAndTimestamp == null) {  
        return;  
    }  
    long offset = offsetAndTimestamp.offset();  
    kafkaConsumer.seek(topicPartition, offset);  
});
新消费组的偏移量

默认情况下,新消费组只能消费自己启动后生产者发送的消息。可以通过如下参数进行配置。

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  • earliest:新消费组将从头开始消费,下次启动接着消费。
  • lastest:默认配置,消费自己启动后收到的消息。

Spring Boot Kafka

1.引入依赖

<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
</dependency>  
  
<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-web</artifactId>  
</dependency>

2.参数配置

spring:  
  kafka:  
    bootstrap-servers: 21.6.162.167:9991,21.6.162.167:9992,21.6.162.167:9993,21.6.162.167:9994  
    producer:  
      acks: 1  
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  
      value-serializer: org.apache.kafka.common.serialization.StringSerializer  
    consumer:  
      group-id: default-group  
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
      enable-auto-commit: false  
      auto-offset-reset: earliest  
      max-poll-records: 500  
    listener:  
      ack-mode: manual_immediate

参数配置与JavaAPI是一样的,使用配置文件更方便。

3.编写消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.springframework.kafka.annotation.KafkaListener;  
import org.springframework.kafka.support.Acknowledgment;  
import org.springframework.stereotype.Component;  
  
import java.util.List;  
  
  
@Component  
public class ConsumerService {  
    @KafkaListener(topics = "topic-2-3", groupId = "default-group-2")  
    public void listenerGroup(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {  
        String v = record.value();  
        System.out.println("收到消息" + record.toString());  
        acknowledgment.acknowledge();  
    }  
}

4.生产者

private final KafkaTemplate<String, String> kafkaTemplate;