消息队列[1]--RabbitMQ


什么是消息队列

  • 消息:请求或应用间传输的数据
  • 队列:一种先进先出的数据结构
  • 消息队列:字面意思就是一种存放数据的先进先出的数据结构或者说容器

总的来说:消息队列是一种服务间 异步通信组件 ,主要解决应用解耦,异步处理,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性。

消息队列的使用场景

1、异步处理

场景:项目之初用户体量小,用户从选商品到支付一条龙,单机完成。后来用户体量扩大,新增很多其他服务,如消费卷,积分等。

由于调用链路的增长,支付耗时也随之增加,影响用户体验。由此引入消息队列(MQ),通过异步处理,提高响应速度。

支付系统支付成功后,只需要往 MQ 中,写入一条支付成功的消息。其他系统订阅后,异步处理各自业务。用户感觉到的时长就是:支付系统的消耗时间 + 写入MQ的消耗时间。

2、应用解耦

  • 不引入消息队列(MQ),那支付过程中的消费卷抵扣、积分抵扣等服务,就需要对应系统暴露接口,以便在支付系统中调用。
    如果此时需要新增一个邮件/短信系统的功能,就需要修改支付系统后重新发布应用。这也带来的不必要的维护成本。

  • 引入消息队列(MQ)后,支付系统不关心有哪些系统需要知道支付成功。支付系统只需向 MQ 中写入一条支付成功的消息,其他系统订阅该消息,在接收到消息时,自发的进行自身的业务即可。

    3、流量削峰

    平时流量比较低,但是一到大促(比如秒杀场景),流量在某一时段突然暴增,可能出现远超服务所能承载的请求打入,导致服务宕机无法对外提供服务。

这种情况下引入消息队列,在大量请求打入服务时,先将请求写入队列,然后服务再按自身处理能力从MQ中拉取消息进行消费,这就达到了流量削峰的作用。

RabbitMQ

RabbitMQ 虽然再吞吐量上不及 kafka、rocketMQ,但是其高效的响应性,消息可靠性上是无与伦比的。完全可以支持用户体量不是那么高的应用。

1、RabblitMQ的工作模型

  • producer:生产者
  • connection:生产者/消费者 和 MQ 之间的TCP连接
  • channel:connection内部建立的虚拟连接
  • broker:MQ 中接受和分发消息的应用
  • Virtual Host:为多租户考虑和安全因素考虑的虚拟分组,不同组的用户不能相互访问
  • Exchange:交换机,生产者将消息发送到交换机,再有交换机发送到队列中
  • Queue:队列,消息在队列中等待被消费者消费
  • Binding:交换机和队列的绑定关系
  • consumer:消费者,消息的最终使用者

消息生产者通过connection连接到RabbitMQ,然后建立虚拟连接[channel],通过channel将消息发送到exchange,因为exchange和queue绑定,所以会通过exchange直接发送并保存在queue队列中,最后消费者通过connection连接到RabbitMQ,然后建立虚拟连接[channel],通过channel从queue中获取并消费消息。

2、RabbitMQ 常用的5种工作模式【共7种】

  1. 简单模式(hello world)
    在下图中,“P”表示消息的生产者【producer】,“C”表示消费者【consumer】,中间的框是一个队列-RabbitMQ保留消息的缓冲区。

    在这种模式下,一个生产者、一个队列、一个消费者(其实还有一个默认的,不需要我们声明的exchange

  2. 工作队列模式

    在这种模式下,一个生产者、一个队列,但是存在多个消费者,多个消费者共同消费队列里的消息。在这种模式下一条消息 只会 被一个消费者消费其实还有一个默认的,不需要我们声明的exchange

  3. 发布订阅模式
    在下图中,“X”表示交换机【exchange】,需要我们手动声明。

    在这种模式下,一个生产者,一个交换机,交换机绑定多个队列,每个队列存在一个消费者。这种模式下,生产者发送的消息,每个消费者都会消费(每个消费者收到的消息是一致的)。

  4. Routing 匹配模式

    在这种模式下,一个生产者,一个交换机,交换机绑定多个队列,每个队列存在一个消费者。在结构上和发布订阅模式完全一致,区别在于交换机绑定队列时,给每个队列设置Routing key属性,生产者发送消息时,会携带Routing key,根据Routing key交换机会将消息发送到不同的队列中

  5. Topic 匹配模式

    在这种模式下,一个生产者,一个交换机,交换机绑定多个队列,每个队列存在一个消费者。在结构和原理上和Routing 匹配模式非常相似,区别在于Routing匹配模式下,routing key 是一个固定的值,需要完成匹配,而Topic 匹配模式支持模糊匹配topic 模式通配符:“ * ”:匹配一个词;“ # ”:匹配一个或多个词,ams.* 能匹配 ams.insert; ams.# 可以匹配 ams.insert.abc 和 ams.insert

    3、RabbitMQ 和SpringBoot 的整合

    整合相关代码已上传至github zhouxh-z

    3.1、生产者

  • jar包引入
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency> 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
  • yml配置文件
    spring:
      rabbitmq:
        port: 5672
        username: guest
        password: guest
        host: 101.37.10.135
        virtual-host: /
        template:
          exchange: xxxxExchange
          default-receive-queue: xxxxQueue
          routing-key: xxxx.#
    
  • 配置类
    配置类的作用:项目启动后在 RabbitMQ 中创建队列,创建交换机,并和队列绑定。
    创建时间:项目启动后第一次连接mq时。如果手动创建队列/交换机,并绑定,则不需要这个配置类
    @Configuration
    public class ProducerConfig {
        @Autowired
        public BeanFactory beanFactory;
        @Value("${spring.rabbitmq.template.exchange}")
        public String exchangeName;
        @Value("${spring.rabbitmq.template.default-receive-queue}")
        public String queueName;
        @Value("${spring.rabbitmq.template.routing-key}")
        public String routeName;
        
        @Bean("exchange")
        public Exchange buildExchange(){
           return ExchangeBuilder.topicExchange(exchangeName).build();
        }
        
        @Bean("queue")
        public Queue buildQueue(){
            return QueueBuilder.durable(queueName).build();
        }
        
        @Bean("binder")
        public Binding buildBinder(){
            Queue queue = (Queue) beanFactory.getBean("queue");
            Exchange exchange = (Exchange) beanFactory.getBean("exchange");
            return BindingBuilder.bind(queue)
                   .to(exchange).with(routeName).noargs();
        }
    }
    
  • 消息发送类
    @RestController
    public class ProducerSender {
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Value("${spring.rabbitmq.template.exchange}")
        public String exchangeName;
        @Value("${spring.rabbitmq.template.routing-key}")
        public String routeName;
        @RequestMapping("/sendMSG")
        public void sendMSG(){
            rabbitTemplate.convertAndSend(exchangeName,routeName,"testMSG");
        }
    }
    

    3.2、消费者

  • 依赖和配置文件
    与生产者一致
  • 消息接收类
    @Component
    public class Consumer {
        // ackMode = "MANUAL" 如果需要手动签收消息,则设置,否则不需要
        @RabbitListener(queues = "zhouxhQueue",ackMode = "MANUAL")
        public void consumer(Message ms, Channel channel) throws IOException {
            // 执行业务,伪代码
            System.out.println(ms);
            // 手动签收
            channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true);
            // 拒收,第二个 Boolean 入参表示消息是否 返回 对列,false表示消息直接丢弃
            // 如果返回队列,存在一个问题 ,该消息将一直重新被消费,需要手动实现重试次数,比如重试次数保存到缓存中,进行手动判断
            //channel.basicNack(ms.getMessageProperties().getDeliveryTag(),true,true);
        }
    }
    

    4、RebbitMQ 高可用集群搭建

  1. 首先搭建好两台单机 RabbitMQ,并分别启动
  2. rabbitmqctl stop_app 两台单机 RabbitMQ 都停止对外服务
  3. rabbitmqctl join_cluster rabbit@101.37.10.135 分别将两台 RabbitMQ 加入服务
  4. rabbitmqctl start_app 分别将两台 RabbitMQ 重新开启对外服务
  5. 通过rabbitmqctl cluster_status 就可以查看 RabbitMQ 的集群状态了

    5、RabbitMQ 高级特性

5.1、RabbitMQ 消息的可靠性传递是如何保障的?

  1. 从生产者到交换机,Confirm 确认模式(springboot-rabbitmq 默认开启)

    • 生产者将消息发送到exchange时,会通过confirm回调,告知生产者发送状态。

        //定义回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 相关配置信息
             * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm方法被执行了....");
      
                 //ack 为  true表示 消息已经到达交换机
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });
      
  2. 从交换机到队列,return 回退模式 (开启 rabbitmq.publisher-returns: true)

    • exchange 收到消息后,通过routing key匹配对应的 queue时,会通过return 回调,告知生产者发送状态

        //设置交换机处理失败消息的模式   为true的时候,消息达到不了 队列时,会将消息重新返回给生产者
        rabbitTemplate.setMandatory(true);
      
        //定义回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");
      
                System.out.println("message:"+message);
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
      
                //处理
            }
        });
      
  3. 从队列到消费者,签收机制(开启:acknowledge-mode: manual)

    • 消费者从队列中拉取消息,如果在消息消费过程中出现异常,我们可以手动拒收消息,然后重新拉取消息。否则正常签收。
      // 手动签收
      channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true);
      // 拒收,第二个 Boolean 入参表示消息是否 返回 对列
      // 如果返回队列,存在一个问题 ,该消息将一直重新被消费,需要手动实现重试次数,比如保存到缓存中
      channel.basicNack(ms.getMessageProperties().getDeliveryTag(),true,true);
      
  4. exchange、queue、message 等持久化

    5.2、限流

    在大促或者秒杀场景下,生产者瞬间打入大量消息,可能因为消费者消费能力不够的问题,导致服务宕机。
    应对这种瞬间的大流量,消费者的限流是非常有必要的。

通过指定监听器工厂,可以达到限流的效果

@Configuration
public class RabbitMqConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    
    @Bean(name = "listenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置限流数量
        factory.setPrefetchCount(50);
        return factory;
    }
}

@RabbitListener 注解中指定 containerFactory

@RabbitListener(queues = "zhouxhQueue",ackMode = "MANUAL",containerFactory = "listenerContainer")
public void consumer(Message ms, Channel channel) throws IOException {
    System.out.println(ms);
    // 手动签收
    channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true)  
}

5.3、延迟队列

RabbitMQ 没有自带的延迟队列,但是我们可以通过 TTL队列 和 死信队列 完成延迟队列的效果。

  1. TTL:配置了超时时间的队列,该队列中的消息达到超时时间还没有被消费,会得删除
  2. 死信队列:对于RabbitMQ来说,其实是死信交换机,我们在一个队列中绑定一个死信交换机,然后当消息死信后就会通过这个死信交换机,进入一个普通队列。
    下图中的“DLX”其实就是DEAD LETTER EXCHANGE(死信交换机)。

进入死心队列的三种方式

  • 队列中的消息已经满了,此时新进入的消息会直接进入死信队列
  • 消费者拒绝签收,且不重入队列
  • 原队列设置了超时时间,消息在队列中达到超时时间没有被消费

那么延迟队列是怎么根据这两种队列完成的呢?
以我们在网上购物,下单了但是还没付款这样一个场景来说明:

  1. 首先我们在订单系统下单,将订单信息发送到TTL队列[假设订单保存时间为30分钟,即30分钟内付款,订单有效,否则订单过期]
  2. 期间我们都没有付款,那么过了30分钟该订单信息会从TTL队列中删除。
  3. 从TTL队列中删除的订单会被发送到死信队列,然后这个消息会被库存系统消费。判断是取消订单,还是正常发货。

    5.4、消息幂等性保障(消息重复读取)

    在发送消息时,携带version,在消费者消费并将消息保存在数据库时,通过乐观锁机制,保障幂等性。

5.5、消息积压问题

由于大促或者秒杀等场景,导致的消息积压问题。

  1. 通过工作队列模式增加消费者,提高消息消费速度。
  2. 消费者先将消息写入数据库,然后业务代码从数据库读取消息进行处理

文章作者: zhouxh-z
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 zhouxh-z !
 上一篇
一文解读 redis 主从/哨兵/集群架构 一文解读 redis 主从/哨兵/集群架构
主从主从结构示意图: 主从架构搭建,配置从节点: 复制 redis.conf 文件,并修改如下配置 port 8001 pidfile /var/redis_8001.pid logfile 8001.log dir /usr/loca
2021-03-10
下一篇 
spring源码研读-AOP-三步走? spring源码研读-AOP-三步走?
本文将基于java-config(注解)研读 spring-AOP 源码 AOP的使用 配置类上添加@EnableAspectJAutoProxy注解 [开启AOP功能] AOP bean对象添加@Aspect注解 @Aspect @Co
2021-03-10
  目录