什么是消息队列,消息队列什么用
RocketMQ
RocketMQ的工作模型
下图是 rocketMQ源码中 的架构图
- producer:生产者
- nameServer:轻量级的服务路由注册中心,主要提供了两大功能:
- broker管理:nameServer接受broker集群的注册,并提供心跳机制,来确定broker是否存活
- 路由信息管理:保存在broker集群的整个路由信息。生产者/消费者通过nameServer就可以知道整个broker集群的路由信息,并进行消息投递。
nameServer通常也是使用集群的部署方式【集群中的nameServer 不进行数据同步】,borker集群会向每个nameServer注册自己的路由信息,这确保了一台nameServer宕机的情况下,其他nameServer仍可以对外提供服务。
- broker:消息的保存和传递,通过虚拟topic和实际的queue【一个topic下包含多个queue】保存消息。包含以下服务
Remoting Module
:broker的子模块,用于处理客户端请求Client Manager
:管理客户端(生产者、消费者)并维护消费者的topic订阅情况Store Service
:提供消息的持久化服务HA Service
:提供主从消息复制的服务Index Service
:建立消息的索引,便于消息的快速查询
- consumer:消费者
前面的架构图,缺少了broker内部相关内容
- 普通消息
- 普通消息根据生产者的发送方式可以分成:同步,异步,单向消息三种。其中同/异步消息,需要RocketMQ的确认回复,单向消息无需回复。
- 顺序消息
- 顺序消息,其实是理用queue 先进先出的原理,将需要顺序消费的消息,发送到同一个queue中。在被消费时,由于先进先出的原则,确保消息的顺序性。实际上这个顺序只能确保局部有序。
- 广播消息
- 正如字面意思,有一个生产者发送的消息,所有消费者都可以进行消费且互不影响。
- 延迟消息
- 生产者发送消息时,设置一个延时等级,根据 rocketMQ 延时等级默认对应的时长,进行延时发送,消费者端与普通消息没什么区别
- 批量消息
- 生产者端在发生消息时,累积到一定数量在批量发送
- 过滤消息
- 在生产者发送消息时,设置过滤条件,消费者端根据过滤条件选择适合自己的消息进行消费。
- 事务消息
- 生产者发送的消息会被发送到
TOPIC
为RMQ_SYS_TRANS_HALF_TOPIC
对应的队列中,然后执行生产者本地事务,如果本地事务执行成功并返回成功,那么会将生产者之前发送的消息从RMQ_SYS_TRANS_HALF_TOPIC
对应的队列中移出,并移入生产者指定的队列。否则生产者发送的队列会被回退。 - 当然这只保证了本地事务和消息写入的事务一致性,如果需要确保下游消费者也达到一致性,需要分布式事务的支持。
- 使用场景:淘宝下单但是未付款,存在15分钟的等待支付时长。下单时,先给broker返回 unknown 信号,等待mq回查支付状态,判断15还没有支付,取消订单。
RocketMQ和spring_boot的整合
示例代码已上传至 GITHUB依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
配置文件
rocketmq.name-server=172.12.11.10:9876 rocketmq.producer.group=topic_test-consumerGroup
生产者
- 生产者发送的消息会被发送到
@RestController
public class ProducerControl {
Logger logger = LoggerFactory.getLogger(ProducerControl.class);
@Resource
RocketMQTemplate rocketMQTemplate;
/**
* convertAndSend()
* springboot对rocketmq API 的封装
* 方法参数
* destination:topic:tag 的拼凑,底层会根据 ":" 来分割
* payload:荷载 ,实际发送的内容
*/
@RequestMapping("/sendMSG")
public String sendMSG(){
// 普通消息
rocketMQTemplate.convertAndSend("topic_test:tagXXX","hello-world");
logger.info("普通消息发送成功");
// 顺序消息
for(int i = 0;i<10;i++){
SendResult sendResult = rocketMQTemplate.syncSendOrderly("topic_test", "order_message_" + i, "");
logger.info("顺序消息:{},发送状态:{}",sendResult.getMsgId(),sendResult.getSendStatus());
}
/**
* 广播消息: 和生产者和普通发消息一致,消费者在消费时,messageModel需要设置为 BROADCASTING 广播模式
*/
// 延迟消息--消息发送时设置延时等级 delayLevel
// syncSend(String destination, Message<?> message, long timeout, int delayLevel)
Message delayedMessage = MessageBuilder.createMessage("delayed_message", new MessageHeaders(new HashMap<>()));
rocketMQTemplate.syncSend("topic_test",delayedMessage,3000,1);
logger.info("延时消息发送成功");
// 批量消息:将多个消息合并成一个批量消息,一次发送
List<Message> messageList = new ArrayList<Message>();
for(int i = 0;i<10;i++){
messageList.add(MessageBuilder.createMessage("batch_message_"+i, new MessageHeaders(new HashMap<>())));
}
rocketMQTemplate.syncSend("topic_test",messageList);
logger.info("批量消息发送成功");
// 过滤消息:主要是消费者端过滤,设置tag 或者 sql92 模式
Message<String> tag_message = MessageBuilder.createMessage("tag_message",
new MessageHeaders(new HashMap<>()));
rocketMQTemplate.syncSend("topic_test:tag_A",tag_message);
logger.info("过滤消息发送成功");
// 事务消息
Message message = MessageBuilder.createMessage("transaction_message", new MessageHeaders(new HashMap<>()));
rocketMQTemplate.sendMessageInTransaction("topic_test",message,"");
logger.info("事务消息发送成功");
rocketMQTemplate.getProducer().setProducerGroup("");
return "发送成功";
}
}
生产者 - 事务消息监听器(需要事务消息,才需要这个类)
executeLocalTransaction: 本地事务方法,如果本地事务执行成功,则返回success,那么不会再调用回查方法(checkLocalTransaction);
checkLocalTransaction:如果本地事务方法失败,rocketMQ会执行该方法,回查本地事务是否成功。如果成功,正常写入消息到对应的topic中。
@RocketMQTransactionListener()
public class MyTransactionListener implements RocketMQLocalTransactionListener {
Logger logger = LoggerFactory.getLogger(MyTransactionListener.class);
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("executeLocalTransaction:"+message);
// 如果本地业务处理成功且返回成功,则不会在执行回查方法 checkLocalTransaction
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
logger.info("checkLocalTransaction:"+message);
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者
@Component
@RocketMQMessageListener(consumerGroup = "topic_test-consumerGroup", topic = "topic_test")
public class Consumer implements RocketMQListener<String> {
Logger logger = LoggerFactory.getLogger(Consumer.class);
@Override
public void onMessage(String message) {
logger.info("消费消息:"+message);
}
}
消息持久化
由于消息队列的高可靠性要求,rocketMQ的数据必须进行持久化
下图是rocketMQ源码中提供的消息存储架构图
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。采用顺序写的方式,预先在磁盘中划定一块存储(大小为1G),然后Producer端写入消息时,按序追加到这个文件中,直到该文件存储满。再划定新存储区域(大小为1G);
ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。;
IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
根绝上面架构我们可以知道以下几个结论
- consumerQueue和IndexFile文件中保存都是消息再commitLog中的索引,【索引和数据分离】
- 消息从commitlog → consumerQueue和IndexFile,必然存在一个后台进程不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据
- 基于磁盘文件的操作还能拥有如此高的吞吐量,rocketMQ 对文件的读写做出了大量优化
- MQ在接收到一条消息时,需要将消息持久化,否则出现MQ宕机的情况,这条消息也就丢失了
- 消费者消费一条消息,需要给这条消息标记一个消费状态并持久化,否则当MQ重启,标记丢失,这个消息会被重复消费。
持久化到哪儿
rocketMQ 持久化的文件保存路径需要在配置文件中指定
配置文件地址:/rocketmq-all-4.3.2-bin-release/conf/
根据rocketMQ的启动方式,选择异步或者同步配置文件#存储路径 storePathRootDir=/opt/store #commitLog 存储路径 storePathCommitLog=/opt/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/opt/store/consumequeue #消息索引存储路径 storePathIndex=/opt/store/index #checkpoint 文件存储路径 storeCheckpoint=/opt/store/checkpoint #abort 文件存储路径 abortFile=/opt/store/abort
刷盘机制
如上图所示,rocketMQ消息持久化到磁盘,可以分为同步刷盘和异步刷盘;
- 同步刷盘:每次消息发送到broker,只有再消息真正写入磁盘,并返回成功ACK响应,生产者端才认为消息发送成功,消息可靠性强,吞吐量低
- 异步刷盘:每次消息发送到broker,只要消息被写入pageCache就会返回一个成功ACK响应。消息可能存在丢失情况,但是吞吐量高。
消息主从同步
如果broker是以集群的方式部署,那么必然存在一主多从之前的数据同步【master复制到slave】,消息复制的方式分为同步复制、异步复制 - 同步复制:等master节点和slave节点都写入成功,才给生产者返回写入成功的状态。数据安全性高,吞吐量低
- 异步复制:只要master节点写入成功,就给生产者返回写入成功的状态,然后异步从master 复制到 slave。吞吐量高,存在数据丢失的情况。
- 如何配置:conf目录下的配置文件,配置 brokerRole 参数,参数类型:ASYNC_MASTER【异步】、 SYNC_MASTER【同步】、SLAVE【从节点接受复制】
负载均衡机制
rocketMQ 负载均衡分为:Producer端发送消息时候的负载均衡、Consumer端订阅消息的负载均衡 - Producer端发送消息负载均衡:默认情况下,采用的随机递增取模的方式,就是第一次给某个topic发送消息时,会随机获取一个随机数,对consumerQueue的总数取模。然后根据模数从consumerQueue的列表中获取。此后再往该topic发送消息,就再原有的随机数上递增然后再取模。
rocketMQ还提供一个sendLatencyFaultEnable
参数,在原有的递增取模的基础上,过滤掉上次失败的broker【在一定时间内下】,源码位置:MQFaultStrategy.selectOneMessageQueue()public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 开启 sendLatencyFaultEnable 配置 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 判断是否是之前存在失败的broker,如果是,继续循环选择下一个broker // latencyFaultTolerance 该对象的更新操作在发送完成时更新,前提是打开了sendLatencyFaultEnable配置 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 未开启sendLatencyFaultEnable的情况下,随即递增取模 return tpInfo.selectOneMessageQueue(); }
- Consumer端订阅消息负载均衡:根据消费模式分为广播模式 和 集群模式
- 广播模式下:首先根据topic获取所有的messageQueue,然后遍历所有messageQueue,并拉取消息
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { // 获取topic对应的messageQueue Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { // 遍历所有messageQueue,挨个拉取消息 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } /** * 省略集群模式 部分代码 */
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { // 第一次进入这里必为 空,所以省略 } List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // 移除偏移量 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 根据 配置 查看 从哪儿开始消费,从第一个、最后一个、某一时间时间戳 开始消费 long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); // 构建拉取请求 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 进行拉取消息 this.dispatchPullRequest(pullRequestList); return changed; }
- 集群模式:根据配置的负载均衡策略以及当前消费者的ClientId,进行给每个消费者分配messageQueue
case CLUSTERING: { // 获取topic的所有队列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 获取所有消费者 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); ... if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { // 根据当前消费者的ClientId和规则,进行分配messageQueue allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { ... } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); ... } break; }
- AllocateMachineRoomNearby:将同机房的Consumer和Broker优先分配在一起。
- AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
- AllocateMessageQueueAveragelyByCircle:轮询分配。轮流的给一个消费者分配一个MessageQueue
- AllocateMessageQueueByConfig:不分配,直接指定一个messageQueue列表
- AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配
- AllocateMessageQueueConsistentHash:根绝一致性hash算法来分配MessageQueue
消息的重投/重试机制
生产者-消息重投
生产者在发送消息时,出现异常导致消息没有正常发送到broker,同步消息存在重投机制、异步消息存在重试机制、单向消息没有消息安全保障。
- retryTimesWhenSendFailed:同步消息发送失败重投次数,默认2次,生产者会重新投递消息 retryTimesWhenSendFailed+1次,且不会选择上次失败的broker。
- retryTimesWhenSendAsyncFailed:异步消息失败重试次数,异步重试不会选择别的broker,仅在同一台broker尝试重试。
- retryAnotherBrokerWhenNotStoreOK:消息持久化超时或者slave同步失败,是否尝试投递其他broker。默认false
消费者-消息重试
生产者将消息正常发送带broker之后,消息消费失败,消费者会重新拉起消息重新消费。
RocketMQ 为每个topic设置了”%RETRY%+consumerGroup“的重试队列,且为需要重试的消息设置重试级别【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】,重试次数越多,重试级别越高【投递延时就越大】。
原理:消息消费失败时,rocketMQ 会将失败的消息发送到topic为”SCHEDULE_TOPIC_XXXX“的延时队列中,后台定时任务按照对应的时间重新将延时队列中的消息移入对应的topic中。
如果重试18次之后仍然失败,rocketMQ 会将失败的消息发送到死信队列中。
消息的幂等性
在确保消息可靠性时,消息重复是不可避免的,如果业务上对于消息重复比较敏感,那么需要在业务层过滤重复的消息。
这就务必确认消息存在唯一键,可以是msgId,也可以是业务的唯一标识字段,比如订单id。在实际使用上,尽量使用业务上的唯一标识,因为有可能存在msgId不一致,但是消息内容一致的情况,如消息重试。
RocketMQ是否支持高可用?如何实现高可用?
RocketMQ 本身是不支持高可用的,但是可以基于 DLedger 搭建可以自动容灾切换的 RocketMQ 集群。
- 下载rocketMQ 4.5 版本以上
- 至少准备三个节点
- 正常启动nameServer
- 根绝配置文化[
../conf/dledger
]下的配置文件启动broker即可消息丢失问题
哪些情况可能存在消息丢失
从上文我们大致可以知道消息从发送到消费经历了4个步骤: - 生产者发送消息到mq-master节点的broker
- broker主从同步
- broker持久化到磁盘
- 消费者拉取消息消费
怎么防止消息丢失
在以上4个步骤中均有可能出现消息丢失的情况,那么如何防止消息丢失也从这4点出发
生产者使用事务消息保障消息可靠性,可以保障生产者本地事务和发送到broker的消息事务一致性。【1可以避免】
RocketMQ配置同步刷盘+Dledger主从架构保证MQ自身不会丢消息【2、3可以避免】
消费者同步拉取消息,并在本地业务处理完成后,返回消费成功标识【4可以避免】