RabbitMQ 高级篇
基础篇只介绍了RabbitMQ的简单使用,但在一些稍微复杂的情况下,基础篇的内容无法保证数据的一致性。
生产者可靠性
生产者重连
有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
# 连接超时时间
spring.rabbitmq.connection-timeout=1000ms
# 开启重连机制
spring.rabbitmq.template.retry.enabled=true
# 每次重连时间间隔的倍率
spring.rabbitmq.template.retry.multiplier=1
# 最大重连次数
spring.rabbitmq.template.retry.max-attempts=3
# 初始的重连间隔
spring.rabbitmq.template.retry.initial-interval=1000ms
需要注意的是Spring Rabbit 提供的重连是阻塞式的,重连成功后或者失败后,才会向下执行(或抛异常)。因此如果对效率要求较高,可以关掉重连机制或者使用异步方式向队列中发送消息。
生产者确认
RabbitMQ有 Publisher Confirm 和 Publisher Return 两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
- 持久消息投递到了MQ,并且完成了入对和持久化,返回ACK,告知投递成功。
- 其他情况返回NACK,告知投递失败
确认机制配置
# 生产者开启确认机制
# 开启publisher-confirm
spring.rabbitmq.publisher-confirm-type=correlated
# 开启publisher-return
spring.rabbitmq.publisher-returns=true
publisher-comfirm-type有三种类型,分别是 correlated、none、simple
- none:关闭confirm机制
- simple:同步阻塞等待MQ的回执消息
- correlated:MQ通过异步回调方式返回回执消息
发送消息代码
UserDTO userDTO = new UserDTO("zolmk", 123);
// 使用消息确认回调机制
// 用它来关联消息
CorrelationData data = new CorrelationData();
// 获取Future对象,添加回调函数
data.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("发生错误", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result.isAck()) {
log.info("消息发送成功!");
} else {
log.error("MQ接收消息失败,原因【{}】", result.getReason());
}
}
});
// 发送消息
rabbitTemplate.convertAndSend("exchange.direct", "cas", userDTO, data);
Spring-Rabbit使用CorrelationData来绑定Publisher Comfirm 回调消息和发送的消息。
Publisher Return机制
在上面我们通过配置开启了Publisher Return机制,仅仅开启是不够的,还需要添加回调函数。Publisher Return 机制的回调函数是绑定在RabbitTemplete
上的,每个RabbitTemplete
只能绑定一个Publisher Return回调函数。
可以通过如下方式(实现Aware接口)绑定Publisher Return回调函数。
@SpringBootApplication
@Slf4j
public class SpringBootRabbitmqApplication implements ApplicationContextAware {
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqApplication.class, args);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("message: {} replyCode: {} replyText: {} exchange: {} routingKey: {}", message.toString(), replyCode, replyText, exchange, routingKey);
}
});
}
}
如何处理生产者的确认消息?
- 生产者确认需要额外的网络和系统资源开销,尽量不要使用
- 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是业务自己的问题
- 对于nack消息可以有限次数重试,依然失败则记录异常消息
MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或者处理过慢时,会导致消息积压,引发MQ阻塞
数据持久化
RabbitMQ实现数据持久化包括3个方面:
- 交换机持久化:创建交换机时选择duration
- 队列持久化:同上
- 消息持久化
消息积压引发的后果
如果消息达到了队列的最大值,会引发PageOut,PageOut的作用是将队列中积压的消息进行持久化,保存到磁盘中。但是PageOut会导致MQ阻塞住,此时无法所有的写入操作都将被阻塞。因此,我们需要避免PageOut发生。
消息持久化
消息持久化可以避免PageOut情况发生,它的做法是将每条消息都进行持久化操作,当队列满时,直接清空队列就可以,不会阻塞MQ。
在Spring Rabbit中,可以使用如下方法声明一个持久化消息:
Message message = MessageBuilder.withBody("Hello World.".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
amqpTemplate.convertAndSend("hi", message);
这里的DeliveryMode就是用来设置一个消息的传递模式,PERSISTENT表示该消息为持久化消息。
LazyQueue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者消费消息时才会从磁盘中将数据加载到内存
- 支持百万条的消息存储
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
可以通过代码或注解方式声明LazyQueue,示例如下:
@Bean
Queue lazyQueue() {
return QueueBuilder.durable("hi").lazy().build();
}
// 通过注解的方式
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
// 这里添加了x-queue-mode参数
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void handlerOne(Map map) {
System.out.println(map.toString());
}
RabbitMQ如何保证消息的可靠性
- 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
- RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后成为了队列的默认模式。LazyQueue会将所有消息都持久化。
- 开启持久化和生产者确认后,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。
RabbitMQ通过将消息持久化到磁盘,从而保证了消息的可靠性,这里只是确保MQ接收到的消息不会因服务器宕机等问题导致消息丢失。
消费者可靠性
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中将消息删除
- nack:消息处理失败,RabbitMQ重新投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
none
:不处理。即消息投递给消费者后立刻ack,消息会从MQ中删除。非常不安全,不建议使用。manual
:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活。auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack
- 如果是消息处理或者转换异常,自动返回
reject
- 如果是业务异常,会自动返回
确认机制的配置项为:
// none 立即回复ack manual 手动发送 auto 由SpringAMQP通过AOP做增强自动发送
spring.rabbitmq.listener.simple.acknowledge-mode=auto
消费失败重试
失败重试机制
当消费者出现异常后,,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理压力飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列,配置方法如下:
# 开启消费者重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
# 设置重试初始时间间隔
spring.rabbitmq.listener.simple.retry.initail-interval=1000ms
# 设置重试时间间隔倍率
spring.rabbitmq.listener.simple.retry.multipier=1
# 设置最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# true 无状态 false 有状态。如果业务中包含事务,这里改为false
spring.rabbitmq.listener.simple.retry.stateless=true
失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
一般推荐使用第三种方式,下面是代码示例:
@Configuration
// 只在spring.rabbitmq.listener.simple.retry.enabled配置项为true的时候生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorHandleConfig {
@Bean
public Queue errorQueue() {
return QueueBuilder.durable("error.queue").build();
}
@Bean
public Exchange errorExchange() {
return ExchangeBuilder.directExchange("error.direct").durable(true).build();
}
@Bean
public Binding binding4(Queue errorQueue, Exchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error").noargs();
}
// 使用RepublishMessageRecoverer将消息发送到相应的交换机
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
消费者如何保证消息一定被消费?
- 开启消费者确认机制为auto,由Spring确认消息处理成功后返回ack,异常时返回nack
- 开启消费者处理失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机
业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。在程序开发总,则是指同一个业务,执行一次或执行多次对业务状态的影响是一致的。
如何保证业务幂等性呢?
唯一消息ID
是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
业务判断(类似于乐观锁)
结合业务逻辑,基于业务本身做判断。以商城系统为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其他状态不做处理。
使用Token
- 每个业务请求都携带一个Token,发起业务时,客户端先向服务器请求token
- 前端将token和请求一起发给服务器
- 服务器从redis中查询token,如果存在,则删除后执行后续业务;如果不存在,则返回业务重复操作错误。(这里判断存在和删除token需要保证==原子性==,可以使用lua脚本来实现)
如何保证支付服务与交易服务之间订单状态一致性?
- 首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
- 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。
如果交易服务消息处理失败,有没有什么兜底方案?
我们在交易服务设置了定时任务,定期查询订单支付状态。这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
延时消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延时任务:设置在一定时间后才执行的任务。
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为==死信==(dead letter):
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为==死信交换机==(Dead Letter Exchange,简称DLX)。
死信交换机本意是对一些未正常处理的消息进行通知,不推荐使用死信交换机来实现延时消息。RabbitMQ有提供了更好的方式来实现。
延迟消息插件
RabbitMQ的官方推出了一个插件,原生支持延时消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
创建并绑定延时交换机的代码如下:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", type = "direct", durable = "true", delayed = "true"),
key = {"hi"}
))
public void listenDelayMessage(String msg) {
log.info("接收到延时消息:{}", msg);
}
或使用代码创建:
@Bean
public DirectExchange exchange23() {
return ExchangeBuilder.directExchange("delay.direct")
.durable(true)
.delayed()
.build();
}
再发送消息时,需要通过消息头x-delay设置延时时间:
@Test
void sendDelayMsg() {
String message = "hello, delay message";
rabbitTemplate.convertAndSend("delay.direct", "hi", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置延时10秒
message.getMessageProperties().setDelay(10000);
return message;
}
});
取消超时订单
在商城系统中常常存在用户下单却迟迟不付款的情况,因为用户下了订单,系统会对该商品减库存,但是由于用户不付款,该商品无法被释放,就会存在有人抢不到商品,有人抢到不付款的情况。为此,一般是对订单添加一个时效,规定用户如果在时间内不付款,会自动取消订单。
设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
- 如果并发较高,30分钟可能堆积的延迟消息过多,对MQ压力很大,MQ需要在极短时间内检测大量延迟消息,查看它们是否过期,非常消耗cpu资源
- 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源
基于上面两个问题,我们可以对延迟时间进行分解,大多数用户从下单到支付都在10秒内,因此我们可以将总的30分钟进行划分,分别在10秒、20秒、...、30分钟的时候进行检测,如果用户已支付,就无需再入延迟队列了。