RabbitMQ 基础篇

13

消息队列用来将同步处理流程异步化,实现低时延服务。

同步调用与异步调用

同步调用

同步调用存在的问题

  • 扩展性差
  • 性能下降
  • 级联失败问题

同步调用的优势

  • 实效性强,等待到结果后才返回
异步调用

异步调用通常使用消息队列来实现。主要包含了下面三种角色:

  • 消息发送者:负责生产消息,并将消息发送给消息代理
  • 消息代理:管理、暂存、转发消息
  • 消息接收者:接收和处理消息

优势:

  • 解除耦合,扩展性强
  • 无需等待,性能好
  • 鼓掌隔离
  • 缓存消息,流量削峰填谷

缺点:

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于Broker的可靠性

数据隔离

通过虚拟主机可以实现数据隔离,不同主机之间数据不互通。一个用户不能对其他用户的虚拟主机进行读写操作。

AMQP

全称为高级消息队列协议(Advanced Message Queuing Protocol),是用于应用程序之间传递业务消息的开放标准。该协议与平台和语言无关,更符合微服务中独立性的要求。

Spring AMQP
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。包含两部分,其中spring-amqp是抽象层,spring-rabbit是实现层。

Work模型

在一条消息队列上绑定多个消费者,多个消费者同时对一个队列上的消息进行处理,每条消息仅会被处理一次,从而加快消息的处理速度。

消费者消息推送限制

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

如果想要改变这种机制,重复利用不对等的系统资源,能者多劳,需要修改配置文件,设置spring.rabbitmq.listener.simple.prefetch的值为1,确保每次只获取一条消息,处理完成后才能获取下一个消息。

交换机

Fanout交换机

Fanout Exchange 会将消息广播到每一个绑定的消息队列。

Direct交换机

Direct Exchange 会将接收到的消息根据路由规则路由到指定的Queue,因此称为 定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange 将消息路由到BindingKey与消息RoutingKey一致的队列
Topic交换机

Topic Exchange 和 Direct Exchange 类似,区别在于routingKey可以是多个单词的列表,并且以.分隔。
Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

声明队列和交换机

方式一:通过注入Bean

SpringAMQP提供了下面几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

可以用以下方式创建和绑定 Exchange和Queue:

/**  
 * 声明一个队列,如果不存在,会进行创建  
 * @return  
 */  
@Bean(name = "q1")  
public Queue queue() {  
    return new Queue("fy");  
}  
/**  
 * 声明一个队列,如果不存在,会进行创建  
 * @return  
 */  
@Bean(name = "q2")  
public Queue queue2() {  
    return new Queue("fanout.queue");  
}
@Bean(name = "de")  
public Exchange directExchange() {  
    return new DirectExchange("exchange.direct");  
}  
@Bean  
public Binding binding1(Queue q1, Exchange de) {  
    return BindingBuilder.bind(q1).to(de).with("cas").noargs();  
}  
@Bean  
public Binding binding2(Queue q2, Exchange de) {  
    return BindingBuilder.bind(q2).to(de).with("tt").noargs();  
}

可以看到这种方式非常麻烦,需要声明的Bean太多了。推荐使用方式二进行声明和绑定交换机。

方式二:通过注解

使用RabbitLinstener、QueueBinding、Queue、Exchange配合完成队列和交换机的创建以及它们之间的绑定。

@RabbitListener(bindings = @QueueBinding(  
        value = @Queue("topic.queue.1"),  
        exchange = @Exchange(name = "ech2.topic", type = ExchangeTypes.TOPIC),  
        key = {"ch.*"}  
))  
public void handlerOne(String s) {  
    System.out.println("ch.* : "+s);  
}  


@RabbitListener(bindings = @QueueBinding(  
        value = @Queue("topic.queue.2"),  
        exchange = @Exchange(name = "ech2.topic", type = ExchangeTypes.TOPIC),  
        key = {"*.news"}  
))  
public void handlerTwo(String s) {  
    System.out.println("*.news : "+s);  
}

消息转换器

消息转换器(Message Converter)的作用是将我们填入的Object对象转换为可在网络上传输的IO字节或字符流,同时也需要支持反向转换。
Spring Rabbit默认情况下使用的消息转换器是SimpleMessageConvetor,它的工作流程如下:

  • 是字符串:直接发送字符串
  • 实现了Serializable接口,则使用JDK提供的序列化机制。
  • 否则,抛出异常

由于JDK序列化后的对象字符串可读性较差,且JDK序列化效率不高,因此我们可以使用Jackson完成消息转换,转换后的对象格式为JSON字符串。

Spring Rabbit提供了Jackson2JsonMessageConverter类来支持使用Jackson完成消息转换,只需要完成如下步骤即可:

  1. 引入jackson依赖
<dependency>  
    <groupId>com.fasterxml.jackson.core</groupId>  
    <artifactId>jackson-databind</artifactId>  
</dependency>
  1. 将Jackson2JsonMessageConverter添加到容器中
@Bean  
MessageConverter jackson2JsonMessageConverter() {  
    return new Jackson2JsonMessageConverter();  
}