什么是消息队列
- 消息:请求或应用间传输的数据
- 队列:一种先进先出的数据结构
- 消息队列:字面意思就是一种存放数据的先进先出的数据结构或者说容器
总的来说:消息队列是一种服务间 异步通信组件 ,主要解决应用解耦,异步处理,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性。
消息队列的使用场景
1、异步处理
场景:项目之初用户体量小,用户从选商品到支付一条龙,单机完成。后来用户体量扩大,新增很多其他服务,如消费卷,积分等。
由于调用链路的增长,支付耗时也随之增加,影响用户体验。由此引入消息队列(MQ),通过异步处理,提高响应速度。
支付系统支付成功后,只需要往 MQ 中,写入一条支付成功的消息。其他系统订阅后,异步处理各自业务。用户感觉到的时长就是:支付系统的消耗时间 + 写入MQ的消耗时间。
2、应用解耦
不引入消息队列(MQ),那支付过程中的消费卷抵扣、积分抵扣等服务,就需要对应系统暴露接口,以便在支付系统中调用。
如果此时需要新增一个邮件/短信系统的功能,就需要修改支付系统后重新发布应用。这也带来的不必要的维护成本。引入消息队列(MQ)后,支付系统不关心有哪些系统需要知道支付成功。支付系统只需向 MQ 中写入一条支付成功的消息,其他系统订阅该消息,在接收到消息时,自发的进行自身的业务即可。
3、流量削峰
平时流量比较低,但是一到大促(比如秒杀场景),流量在某一时段突然暴增,可能出现远超服务所能承载的请求打入,导致服务宕机无法对外提供服务。
这种情况下引入消息队列,在大量请求打入服务时,先将请求写入队列,然后服务再按自身处理能力从MQ中拉取消息进行消费,这就达到了流量削峰的作用。
RabbitMQ
RabbitMQ 虽然再吞吐量上不及 kafka、rocketMQ,但是其高效的响应性,消息可靠性上是无与伦比的。完全可以支持用户体量不是那么高的应用。
1、RabblitMQ的工作模型
- producer:生产者
- connection:生产者/消费者 和 MQ 之间的TCP连接
- channel:connection内部建立的虚拟连接
- broker:MQ 中接受和分发消息的应用
- Virtual Host:为多租户考虑和安全因素考虑的虚拟分组,不同组的用户不能相互访问
- Exchange:交换机,生产者将消息发送到交换机,再有交换机发送到队列中
- Queue:队列,消息在队列中等待被消费者消费
- Binding:交换机和队列的绑定关系
- consumer:消费者,消息的最终使用者
消息生产者通过connection连接到RabbitMQ,然后建立虚拟连接[channel],通过channel将消息发送到exchange,因为exchange和queue绑定,所以会通过exchange直接发送并保存在queue队列中,最后消费者通过connection连接到RabbitMQ,然后建立虚拟连接[channel],通过channel从queue中获取并消费消息。
2、RabbitMQ 常用的5种工作模式【共7种】
简单模式(hello world)
在下图中,“P”表示消息的生产者【producer】,“C”表示消费者【consumer】,中间的框是一个队列-RabbitMQ保留消息的缓冲区。在这种模式下,一个生产者、一个队列、一个消费者(
其实还有一个默认的,不需要我们声明的exchange
)工作队列模式
在这种模式下,一个生产者、一个队列,但是存在多个消费者,多个消费者共同消费队列里的消息。在这种模式下一条消息 只会 被一个消费者消费(
其实还有一个默认的,不需要我们声明的exchange
)发布订阅模式
在下图中,“X”表示交换机【exchange】,需要我们手动声明。在这种模式下,一个生产者,一个交换机,交换机绑定多个队列,每个队列存在一个消费者。这种模式下,生产者发送的消息,每个消费者都会消费(每个消费者收到的消息是一致的)。
Routing 匹配模式
在这种模式下,一个生产者,一个交换机,交换机绑定多个队列,每个队列存在一个消费者。在结构上和发布订阅模式完全一致,区别在于交换机绑定队列时,给每个队列设置Routing key属性,生产者发送消息时,会携带Routing key,根据Routing key交换机会将消息发送到不同的队列中
Topic 匹配模式
在这种模式下,一个生产者,一个交换机,交换机绑定多个队列,每个队列存在一个消费者。在结构和原理上和Routing 匹配模式非常相似,区别在于Routing匹配模式下,routing key 是一个固定的值,需要完成匹配,而Topic 匹配模式支持模糊匹配。
topic 模式通配符:“ * ”:匹配一个词;“ # ”:匹配一个或多个词,ams.* 能匹配 ams.insert; ams.# 可以匹配 ams.insert.abc 和 ams.insert
3、RabbitMQ 和SpringBoot 的整合
整合相关代码已上传至github zhouxh-z
3.1、生产者
- jar包引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
- yml配置文件
spring: rabbitmq: port: 5672 username: guest password: guest host: 101.37.10.135 virtual-host: / template: exchange: xxxxExchange default-receive-queue: xxxxQueue routing-key: xxxx.#
- 配置类
配置类的作用:项目启动后在 RabbitMQ 中创建队列,创建交换机,并和队列绑定。
创建时间:项目启动后第一次连接mq时。如果手动创建队列/交换机,并绑定,则不需要这个配置类
@Configuration public class ProducerConfig { @Autowired public BeanFactory beanFactory; @Value("${spring.rabbitmq.template.exchange}") public String exchangeName; @Value("${spring.rabbitmq.template.default-receive-queue}") public String queueName; @Value("${spring.rabbitmq.template.routing-key}") public String routeName; @Bean("exchange") public Exchange buildExchange(){ return ExchangeBuilder.topicExchange(exchangeName).build(); } @Bean("queue") public Queue buildQueue(){ return QueueBuilder.durable(queueName).build(); } @Bean("binder") public Binding buildBinder(){ Queue queue = (Queue) beanFactory.getBean("queue"); Exchange exchange = (Exchange) beanFactory.getBean("exchange"); return BindingBuilder.bind(queue) .to(exchange).with(routeName).noargs(); } }
- 消息发送类
@RestController public class ProducerSender { @Autowired RabbitTemplate rabbitTemplate; @Value("${spring.rabbitmq.template.exchange}") public String exchangeName; @Value("${spring.rabbitmq.template.routing-key}") public String routeName; @RequestMapping("/sendMSG") public void sendMSG(){ rabbitTemplate.convertAndSend(exchangeName,routeName,"testMSG"); } }
3.2、消费者
- 依赖和配置文件
与生产者一致 - 消息接收类
@Component public class Consumer { // ackMode = "MANUAL" 如果需要手动签收消息,则设置,否则不需要 @RabbitListener(queues = "zhouxhQueue",ackMode = "MANUAL") public void consumer(Message ms, Channel channel) throws IOException { // 执行业务,伪代码 System.out.println(ms); // 手动签收 channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true); // 拒收,第二个 Boolean 入参表示消息是否 返回 对列,false表示消息直接丢弃 // 如果返回队列,存在一个问题 ,该消息将一直重新被消费,需要手动实现重试次数,比如重试次数保存到缓存中,进行手动判断 //channel.basicNack(ms.getMessageProperties().getDeliveryTag(),true,true); } }
4、RebbitMQ 高可用集群搭建
- 首先搭建好两台单机 RabbitMQ,并分别启动
rabbitmqctl stop_app
两台单机 RabbitMQ 都停止对外服务rabbitmqctl join_cluster rabbit@101.37.10.135
分别将两台 RabbitMQ 加入服务rabbitmqctl start_app
分别将两台 RabbitMQ 重新开启对外服务- 通过
rabbitmqctl cluster_status
就可以查看 RabbitMQ 的集群状态了5、RabbitMQ 高级特性
5.1、RabbitMQ 消息的可靠性传递是如何保障的?
从生产者到交换机,Confirm 确认模式(springboot-rabbitmq 默认开启)
生产者将消息发送到exchange时,会通过confirm回调,告知生产者发送状态。
//定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被执行了...."); //ack 为 true表示 消息已经到达交换机 if (ack) { //接收成功 System.out.println("接收成功消息" + cause); } else { //接收失败 System.out.println("接收失败消息" + cause); //做一些处理,让消息再次发送。 } } });
从交换机到队列,return 回退模式 (开启 rabbitmq.publisher-returns: true)
exchange 收到消息后,通过routing key匹配对应的 queue时,会通过return 回调,告知生产者发送状态
//设置交换机处理失败消息的模式 为true的时候,消息达到不了 队列时,会将消息重新返回给生产者 rabbitTemplate.setMandatory(true); //定义回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("return 执行了...."); System.out.println("message:"+message); System.out.println("replyCode:"+replyCode); System.out.println("replyText:"+replyText); System.out.println("exchange:"+exchange); System.out.println("routingKey:"+routingKey); //处理 } });
从队列到消费者,签收机制(开启:acknowledge-mode: manual)
- 消费者从队列中拉取消息,如果在消息消费过程中出现异常,我们可以手动拒收消息,然后重新拉取消息。否则正常签收。
// 手动签收 channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true); // 拒收,第二个 Boolean 入参表示消息是否 返回 对列 // 如果返回队列,存在一个问题 ,该消息将一直重新被消费,需要手动实现重试次数,比如保存到缓存中 channel.basicNack(ms.getMessageProperties().getDeliveryTag(),true,true);
- 消费者从队列中拉取消息,如果在消息消费过程中出现异常,我们可以手动拒收消息,然后重新拉取消息。否则正常签收。
exchange、queue、message 等持久化
5.2、限流
在大促或者秒杀场景下,生产者瞬间打入大量消息,可能因为消费者消费能力不够的问题,导致服务宕机。
应对这种瞬间的大流量,消费者的限流是非常有必要的。
通过指定监听器工厂,可以达到限流的效果
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean(name = "listenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 设置限流数量
factory.setPrefetchCount(50);
return factory;
}
}
在 @RabbitListener
注解中指定 containerFactory
@RabbitListener(queues = "zhouxhQueue",ackMode = "MANUAL",containerFactory = "listenerContainer")
public void consumer(Message ms, Channel channel) throws IOException {
System.out.println(ms);
// 手动签收
channel.basicAck(ms.getMessageProperties().getDeliveryTag(),true)
}
5.3、延迟队列
RabbitMQ 没有自带的延迟队列,但是我们可以通过 TTL队列 和 死信队列 完成延迟队列的效果。
- TTL:配置了超时时间的队列,该队列中的消息达到超时时间还没有被消费,会得删除
- 死信队列:对于RabbitMQ来说,其实是死信交换机,我们在一个队列中绑定一个死信交换机,然后当消息死信后就会通过这个死信交换机,进入一个普通队列。
下图中的“DLX”其实就是DEAD LETTER EXCHANGE(死信交换机)。
进入死心队列的三种方式
- 队列中的消息已经满了,此时新进入的消息会直接进入死信队列
- 消费者拒绝签收,且不重入队列
- 原队列设置了超时时间,消息在队列中达到超时时间没有被消费
那么延迟队列是怎么根据这两种队列完成的呢?
以我们在网上购物,下单了但是还没付款这样一个场景来说明:
- 首先我们在订单系统下单,将订单信息发送到TTL队列[假设订单保存时间为30分钟,即30分钟内付款,订单有效,否则订单过期]
- 期间我们都没有付款,那么过了30分钟该订单信息会从TTL队列中删除。
- 从TTL队列中删除的订单会被发送到死信队列,然后这个消息会被库存系统消费。判断是取消订单,还是正常发货。
5.4、消息幂等性保障(消息重复读取)
在发送消息时,携带version,在消费者消费并将消息保存在数据库时,通过乐观锁机制,保障幂等性。
5.5、消息积压问题
由于大促或者秒杀等场景,导致的消息积压问题。
- 通过工作队列模式增加消费者,提高消息消费速度。
- 消费者先将消息写入数据库,然后业务代码从数据库读取消息进行处理