RabbitMQ 基础篇
消息队列用来将同步处理流程异步化,实现低时延服务。
同步调用与异步调用
同步调用
同步调用存在的问题
- 扩展性差
- 性能下降
- 级联失败问题
同步调用的优势
- 实效性强,等待到结果后才返回
异步调用
异步调用通常使用消息队列来实现。主要包含了下面三种角色:
- 消息发送者:负责生产消息,并将消息发送给消息代理
- 消息代理:管理、暂存、转发消息
- 消息接收者:接收和处理消息
优势:
- 解除耦合,扩展性强
- 无需等待,性能好
- 鼓掌隔离
- 缓存消息,流量削峰填谷
缺点:
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于Broker的可靠性
数据隔离
通过虚拟主机可以实现数据隔离,不同主机之间数据不互通。一个用户不能对其他用户的虚拟主机进行读写操作。
AMQP
全称为高级消息队列协议(Advanced Message Queuing Protocol),是用于应用程序之间传递业务消息的开放标准。该协议与平台和语言无关,更符合微服务中独立性的要求。
Spring AMQP
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。包含两部分,其中spring-amqp是抽象层,spring-rabbit是实现层。
Work模型
在一条消息队列上绑定多个消费者,多个消费者同时对一个队列上的消息进行处理,每条消息仅会被处理一次,从而加快消息的处理速度。
消费者消息推送限制
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
如果想要改变这种机制,重复利用不对等的系统资源,能者多劳,需要修改配置文件,设置spring.rabbitmq.listener.simple.prefetch
的值为1,确保每次只获取一条消息,处理完成后才能获取下一个消息。
交换机
Fanout交换机
Fanout Exchange 会将消息广播到每一个绑定的消息队列。
Direct交换机
Direct Exchange 会将接收到的消息根据路由规则路由到指定的Queue,因此称为 定向路由。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange 将消息路由到BindingKey与消息RoutingKey一致的队列
Topic交换机
Topic Exchange 和 Direct Exchange 类似,区别在于routingKey可以是多个单词的列表,并且以.分隔。
Queue与Exchange指定BindingKey时可以使用通配符:
#
:代指0个或多个单词*
:代指一个单词
声明队列和交换机
方式一:通过注入Bean
SpringAMQP提供了下面几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
可以用以下方式创建和绑定 Exchange和Queue:
/**
* 声明一个队列,如果不存在,会进行创建
* @return
*/
@Bean(name = "q1")
public Queue queue() {
return new Queue("fy");
}
/**
* 声明一个队列,如果不存在,会进行创建
* @return
*/
@Bean(name = "q2")
public Queue queue2() {
return new Queue("fanout.queue");
}
@Bean(name = "de")
public Exchange directExchange() {
return new DirectExchange("exchange.direct");
}
@Bean
public Binding binding1(Queue q1, Exchange de) {
return BindingBuilder.bind(q1).to(de).with("cas").noargs();
}
@Bean
public Binding binding2(Queue q2, Exchange de) {
return BindingBuilder.bind(q2).to(de).with("tt").noargs();
}
可以看到这种方式非常麻烦,需要声明的Bean太多了。推荐使用方式二进行声明和绑定交换机。
方式二:通过注解
使用RabbitLinstener、QueueBinding、Queue、Exchange配合完成队列和交换机的创建以及它们之间的绑定。
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.1"),
exchange = @Exchange(name = "ech2.topic", type = ExchangeTypes.TOPIC),
key = {"ch.*"}
))
public void handlerOne(String s) {
System.out.println("ch.* : "+s);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue.2"),
exchange = @Exchange(name = "ech2.topic", type = ExchangeTypes.TOPIC),
key = {"*.news"}
))
public void handlerTwo(String s) {
System.out.println("*.news : "+s);
}
消息转换器
消息转换器(Message Converter)的作用是将我们填入的Object对象转换为可在网络上传输的IO字节或字符流,同时也需要支持反向转换。
Spring Rabbit默认情况下使用的消息转换器是SimpleMessageConvetor,它的工作流程如下:
- 是字符串:直接发送字符串
- 实现了
Serializable
接口,则使用JDK提供的序列化机制。 - 否则,抛出异常
由于JDK序列化后的对象字符串可读性较差,且JDK序列化效率不高,因此我们可以使用Jackson完成消息转换,转换后的对象格式为JSON字符串。
Spring Rabbit提供了Jackson2JsonMessageConverter类来支持使用Jackson完成消息转换,只需要完成如下步骤即可:
- 引入jackson依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 将Jackson2JsonMessageConverter添加到容器中
@Bean
MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}