RabbitMQ 高级篇

66

基础篇只介绍了RabbitMQ的简单使用,但在一些稍微复杂的情况下,基础篇的内容无法保证数据的一致性。

生产者可靠性

生产者重连

有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

# 连接超时时间  
spring.rabbitmq.connection-timeout=1000ms  
# 开启重连机制  
spring.rabbitmq.template.retry.enabled=true  
# 每次重连时间间隔的倍率  
spring.rabbitmq.template.retry.multiplier=1  
# 最大重连次数  
spring.rabbitmq.template.retry.max-attempts=3  
# 初始的重连间隔  
spring.rabbitmq.template.retry.initial-interval=1000ms

需要注意的是Spring Rabbit 提供的重连是阻塞式的,重连成功后或者失败后,才会向下执行(或抛异常)。因此如果对效率要求较高,可以关掉重连机制或者使用异步方式向队列中发送消息。

生产者确认

RabbitMQ有 Publisher ConfirmPublisher Return 两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
  • 持久消息投递到了MQ,并且完成了入对和持久化,返回ACK,告知投递成功。
  • 其他情况返回NACK,告知投递失败

确认机制配置

# 生产者开启确认机制  
# 开启publisher-confirm
spring.rabbitmq.publisher-confirm-type=correlated  
# 开启publisher-return
spring.rabbitmq.publisher-returns=true

publisher-comfirm-type有三种类型,分别是 correlated、none、simple

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执消息
  • correlated:MQ通过异步回调方式返回回执消息

发送消息代码

UserDTO userDTO = new UserDTO("zolmk", 123);  
// 使用消息确认回调机制  
// 用它来关联消息  
CorrelationData data = new CorrelationData(); 
// 获取Future对象,添加回调函数
data.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {  
    @Override  
    public void onFailure(Throwable ex) {  
        log.error("发生错误", ex);  
    }  
  
    @Override  
    public void onSuccess(CorrelationData.Confirm result) {  
        if (result.isAck()) {  
            log.info("消息发送成功!");  
        } else {  
            log.error("MQ接收消息失败,原因【{}】", result.getReason());  
        }  
    }  
});  
// 发送消息
rabbitTemplate.convertAndSend("exchange.direct", "cas", userDTO, data);

Spring-Rabbit使用CorrelationData来绑定Publisher Comfirm 回调消息和发送的消息。

Publisher Return机制
在上面我们通过配置开启了Publisher Return机制,仅仅开启是不够的,还需要添加回调函数。Publisher Return 机制的回调函数是绑定在RabbitTemplete上的,每个RabbitTemplete只能绑定一个Publisher Return回调函数。

可以通过如下方式(实现Aware接口)绑定Publisher Return回调函数。

@SpringBootApplication  
@Slf4j  
public class SpringBootRabbitmqApplication implements ApplicationContextAware {  
  
    public static void main(String[] args) {  
        SpringApplication.run(SpringBootRabbitmqApplication.class, args);  
    }  
  
  
    @Override  
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {  
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);  
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {  
            @Override  
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {  
                log.info("message: {} replyCode: {} replyText: {} exchange: {} routingKey: {}", message.toString(), replyCode, replyText, exchange, routingKey);            
}  
        });  
    }  
}

如何处理生产者的确认消息?

  • 生产者确认需要额外的网络和系统资源开销,尽量不要使用
  • 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是业务自己的问题
  • 对于nack消息可以有限次数重试,依然失败则记录异常消息

MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,引发MQ阻塞
数据持久化

RabbitMQ实现数据持久化包括3个方面:

  • 交换机持久化:创建交换机时选择duration
  • 队列持久化:同上
  • 消息持久化

消息积压引发的后果
如果消息达到了队列的最大值,会引发PageOut,PageOut的作用是将队列中积压的消息进行持久化,保存到磁盘中。但是PageOut会导致MQ阻塞住,此时无法所有的写入操作都将被阻塞。因此,我们需要避免PageOut发生。

消息持久化
消息持久化可以避免PageOut情况发生,它的做法是将每条消息都进行持久化操作,当队列满时,直接清空队列就可以,不会阻塞MQ。

在Spring Rabbit中,可以使用如下方法声明一个持久化消息:

Message message = MessageBuilder.withBody("Hello World.".getBytes(StandardCharsets.UTF_8))  
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();  

amqpTemplate.convertAndSend("hi", message);

这里的DeliveryMode就是用来设置一个消息的传递模式,PERSISTENT表示该消息为持久化消息。

LazyQueue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者消费消息时才会从磁盘中将数据加载到内存
  • 支持百万条的消息存储

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
可以通过代码或注解方式声明LazyQueue,示例如下:

@Bean  
Queue lazyQueue() {  
    return QueueBuilder.durable("hi").lazy().build();  
}

// 通过注解的方式
@RabbitListener(queuesToDeclare = @Queue(  
        name = "lazy.queue",  
        durable = "true",  
        // 这里添加了x-queue-mode参数
        arguments = @Argument(name = "x-queue-mode", value = "lazy")  
))  
public void handlerOne(Map map) {  
    System.out.println(map.toString());  
}
RabbitMQ如何保证消息的可靠性
  • 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
  • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后成为了队列的默认模式。LazyQueue会将所有消息都持久化。
  • 开启持久化和生产者确认后,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。

RabbitMQ通过将消息持久化到磁盘,从而保证了消息的可靠性,这里只是确保MQ接收到的消息不会因服务器宕机等问题导致消息丢失。

消费者可靠性

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中将消息删除
  • nack:消息处理失败,RabbitMQ重新投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会从MQ中删除。非常不安全,不建议使用。
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活。
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或者转换异常,自动返回reject

确认机制的配置项为:

// none 立即回复ack manual 手动发送 auto 由SpringAMQP通过AOP做增强自动发送
spring.rabbitmq.listener.simple.acknowledge-mode=auto
消费失败重试

失败重试机制
当消费者出现异常后,,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理压力飙升,带来不必要的压力。

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列,配置方法如下:

# 开启消费者重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
# 设置重试初始时间间隔
spring.rabbitmq.listener.simple.retry.initail-interval=1000ms
# 设置重试时间间隔倍率
spring.rabbitmq.listener.simple.retry.multipier=1
# 设置最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# true 无状态 false 有状态。如果业务中包含事务,这里改为false
spring.rabbitmq.listener.simple.retry.stateless=true

失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

一般推荐使用第三种方式,下面是代码示例:

@Configuration
// 只在spring.rabbitmq.listener.simple.retry.enabled配置项为true的时候生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")  
public class ErrorHandleConfig {  
    @Bean  
    public Queue errorQueue() {  
        return QueueBuilder.durable("error.queue").build();  
    }  
  
    @Bean  
    public Exchange errorExchange() {  
        return ExchangeBuilder.directExchange("error.direct").durable(true).build();  
    }  
  
    @Bean  
    public Binding binding4(Queue errorQueue, Exchange errorExchange) {  
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error").noargs();  
    }  
	// 使用RepublishMessageRecoverer将消息发送到相应的交换机
    @Bean  
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {  
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");  
    }  
}
消费者如何保证消息一定被消费?
  1. 开启消费者确认机制为auto,由Spring确认消息处理成功后返回ack,异常时返回nack
  2. 开启消费者处理失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。在程序开发总,则是指同一个业务,执行一次或执行多次对业务状态的影响是一致的。

如何保证业务幂等性呢?

唯一消息ID
是给每个消息都设置一个唯一id,利用id区分是否是重复消息:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

业务判断(类似于乐观锁)
结合业务逻辑,基于业务本身做判断。以商城系统为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其他状态不做处理。

使用Token

  1. 每个业务请求都携带一个Token,发起业务时,客户端先向服务器请求token
  2. 前端将token和请求一起发给服务器
  3. 服务器从redis中查询token,如果存在,则删除后执行后续业务;如果不存在,则返回业务重复操作错误。(这里判断存在和删除token需要保证==原子性==,可以使用lua脚本来实现)
如何保证支付服务与交易服务之间订单状态一致性?
  • 首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
  • 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。
如果交易服务消息处理失败,有没有什么兜底方案?

我们在交易服务设置了定时任务,定期查询订单支付状态。这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延时消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延时任务:设置在一定时间后才执行的任务。

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为==死信==(dead letter):

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信
    如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为==死信交换机==(Dead Letter Exchange,简称DLX)。

死信交换机本意是对一些未正常处理的消息进行通知,不推荐使用死信交换机来实现延时消息。RabbitMQ有提供了更好的方式来实现。

延迟消息插件

RabbitMQ的官方推出了一个插件,原生支持延时消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

创建并绑定延时交换机的代码如下:

@RabbitListener(bindings = @QueueBinding(  
        value = @Queue(name = "delay.queue", durable = "true"),  
        exchange = @Exchange(name = "delay.direct", type = "direct", durable = "true", delayed = "true"),  
        key = {"hi"}  
))  
public void listenDelayMessage(String msg) {  
    log.info("接收到延时消息:{}", msg);  
}

或使用代码创建:

@Bean  
public DirectExchange exchange23() {  
    return ExchangeBuilder.directExchange("delay.direct")  
            .durable(true)  
            .delayed()  
            .build();  
}

再发送消息时,需要通过消息头x-delay设置延时时间:

@Test  
void sendDelayMsg() {  
    String message = "hello, delay message";  
    rabbitTemplate.convertAndSend("delay.direct", "hi", message, new MessagePostProcessor() {  
        @Override  
        public Message postProcessMessage(Message message) throws AmqpException {  
            // 设置延时10秒  
            message.getMessageProperties().setDelay(10000);  
            return message;  
        }  
    });
取消超时订单

在商城系统中常常存在用户下单却迟迟不付款的情况,因为用户下了订单,系统会对该商品减库存,但是由于用户不付款,该商品无法被释放,就会存在有人抢不到商品,有人抢到不付款的情况。为此,一般是对订单添加一个时效,规定用户如果在时间内不付款,会自动取消订单。

设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

  • 如果并发较高,30分钟可能堆积的延迟消息过多,对MQ压力很大,MQ需要在极短时间内检测大量延迟消息,查看它们是否过期,非常消耗cpu资源
  • 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源

基于上面两个问题,我们可以对延迟时间进行分解,大多数用户从下单到支付都在10秒内,因此我们可以将总的30分钟进行划分,分别在10秒、20秒、...、30分钟的时候进行检测,如果用户已支付,就无需再入延迟队列了。