消息队列[2]--RocketMQ


什么是消息队列,消息队列什么用

什么是消息队列?消息队列什么用?

RocketMQ

RocketMQ的工作模型

下图是 rocketMQ源码中 的架构图

image.png

  • producer:生产者
  • nameServer:轻量级的服务路由注册中心,主要提供了两大功能:
    • broker管理:nameServer接受broker集群的注册,并提供心跳机制,来确定broker是否存活
    • 路由信息管理:保存在broker集群的整个路由信息。生产者/消费者通过nameServer就可以知道整个broker集群的路由信息,并进行消息投递。
      nameServer通常也是使用集群的部署方式【集群中的nameServer 不进行数据同步】,borker集群会向每个nameServer注册自己的路由信息,这确保了一台nameServer宕机的情况下,其他nameServer仍可以对外提供服务。
  • broker:消息的保存和传递,通过虚拟topic和实际的queue【一个topic下包含多个queue】保存消息。包含以下服务
    • Remoting Module:broker的子模块,用于处理客户端请求
    • Client Manager:管理客户端(生产者、消费者)并维护消费者的topic订阅情况
    • Store Service:提供消息的持久化服务
    • HA Service:提供主从消息复制的服务
    • Index Service:建立消息的索引,便于消息的快速查询
  • consumer:消费者

前面的架构图,缺少了broker内部相关内容

  • topic:broker上的逻辑分组,仅是一个逻辑上的概念。
  • message queue:broker上真实存在的内存区域,消息的最终目的地和消息的保存地。

    RocketMQ的7种消息

  1. 普通消息
    • 普通消息根据生产者的发送方式可以分成:同步,异步,单向消息三种。其中同/异步消息,需要RocketMQ的确认回复,单向消息无需回复。
  2. 顺序消息
    • 顺序消息,其实是理用queue 先进先出的原理,将需要顺序消费的消息,发送到同一个queue中。在被消费时,由于先进先出的原则,确保消息的顺序性。实际上这个顺序只能确保局部有序。
  3. 广播消息
    • 正如字面意思,有一个生产者发送的消息,所有消费者都可以进行消费且互不影响。
  4. 延迟消息
    • 生产者发送消息时,设置一个延时等级,根据 rocketMQ 延时等级默认对应的时长,进行延时发送,消费者端与普通消息没什么区别
  5. 批量消息
    • 生产者端在发生消息时,累积到一定数量在批量发送
  6. 过滤消息
    • 在生产者发送消息时,设置过滤条件,消费者端根据过滤条件选择适合自己的消息进行消费。
  7. 事务消息
    • 生产者发送的消息会被发送到TOPICRMQ_SYS_TRANS_HALF_TOPIC 对应的队列中,然后执行生产者本地事务,如果本地事务执行成功并返回成功,那么会将生产者之前发送的消息从RMQ_SYS_TRANS_HALF_TOPIC 对应的队列中移出,并移入生产者指定的队列。否则生产者发送的队列会被回退。
    • 当然这只保证了本地事务和消息写入的事务一致性,如果需要确保下游消费者也达到一致性,需要分布式事务的支持。
    • 使用场景:淘宝下单但是未付款,存在15分钟的等待支付时长。下单时,先给broker返回 unknown 信号,等待mq回查支付状态,判断15还没有支付,取消订单。
    image.png

    RocketMQ和spring_boot的整合

    示例代码已上传至 GITHUB

    依赖

    <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>2.2.0</version>
    </dependency>
    

    配置文件

    rocketmq.name-server=172.12.11.10:9876
    rocketmq.producer.group=topic_test-consumerGroup
    

    生产者

@RestController
public class ProducerControl {
    Logger logger = LoggerFactory.getLogger(ProducerControl.class);
    @Resource
    RocketMQTemplate rocketMQTemplate;
    /**
     * convertAndSend()
     *  springboot对rocketmq API 的封装
     *  方法参数
     *      destination:topic:tag 的拼凑,底层会根据 ":" 来分割
     *      payload:荷载 ,实际发送的内容
     */
    @RequestMapping("/sendMSG")
    public String sendMSG(){
        // 普通消息
        rocketMQTemplate.convertAndSend("topic_test:tagXXX","hello-world");
        logger.info("普通消息发送成功");
        // 顺序消息
        for(int i = 0;i<10;i++){
            SendResult sendResult = rocketMQTemplate.syncSendOrderly("topic_test", "order_message_" + i, "");
            logger.info("顺序消息:{},发送状态:{}",sendResult.getMsgId(),sendResult.getSendStatus());
        }
        /**
         * 广播消息: 和生产者和普通发消息一致,消费者在消费时,messageModel需要设置为 BROADCASTING 广播模式
         */

        // 延迟消息--消息发送时设置延时等级 delayLevel
        // syncSend(String destination, Message<?> message, long timeout, int delayLevel)
        Message delayedMessage = MessageBuilder.createMessage("delayed_message", new MessageHeaders(new HashMap<>()));
        rocketMQTemplate.syncSend("topic_test",delayedMessage,3000,1);
        logger.info("延时消息发送成功");

        // 批量消息:将多个消息合并成一个批量消息,一次发送
        List<Message> messageList = new ArrayList<Message>();
        for(int i = 0;i<10;i++){
            messageList.add(MessageBuilder.createMessage("batch_message_"+i, new MessageHeaders(new HashMap<>())));
        }
        rocketMQTemplate.syncSend("topic_test",messageList);
        logger.info("批量消息发送成功");

        // 过滤消息:主要是消费者端过滤,设置tag 或者 sql92 模式
        Message<String> tag_message = MessageBuilder.createMessage("tag_message",
                new MessageHeaders(new HashMap<>()));
        rocketMQTemplate.syncSend("topic_test:tag_A",tag_message);
        logger.info("过滤消息发送成功");

        // 事务消息
        Message message = MessageBuilder.createMessage("transaction_message", new MessageHeaders(new HashMap<>()));
        rocketMQTemplate.sendMessageInTransaction("topic_test",message,"");
        logger.info("事务消息发送成功");

        rocketMQTemplate.getProducer().setProducerGroup("");
        return "发送成功";
    }
}
生产者 - 事务消息监听器(需要事务消息,才需要这个类)

executeLocalTransaction: 本地事务方法,如果本地事务执行成功,则返回success,那么不会再调用回查方法(checkLocalTransaction);

checkLocalTransaction:如果本地事务方法失败,rocketMQ会执行该方法,回查本地事务是否成功。如果成功,正常写入消息到对应的topic中。

@RocketMQTransactionListener()
public class MyTransactionListener implements RocketMQLocalTransactionListener {
    Logger logger = LoggerFactory.getLogger(MyTransactionListener.class);

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        logger.info("executeLocalTransaction:"+message);
        // 如果本地业务处理成功且返回成功,则不会在执行回查方法 checkLocalTransaction
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        logger.info("checkLocalTransaction:"+message);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

消费者

@Component
@RocketMQMessageListener(consumerGroup = "topic_test-consumerGroup", topic = "topic_test")
public class Consumer implements RocketMQListener<String> {
    Logger logger = LoggerFactory.getLogger(Consumer.class);
    @Override
    public void onMessage(String message) {
        logger.info("消费消息:"+message);
    }
}

消息持久化

由于消息队列的高可靠性要求,rocketMQ的数据必须进行持久化
下图是rocketMQ源码中提供的消息存储架构图
image.png
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

  1. CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。采用顺序写的方式,预先在磁盘中划定一块存储(大小为1G),然后Producer端写入消息时,按序追加到这个文件中,直到该文件存储满。再划定新存储区域(大小为1G);

  2. ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。;

  3. IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

根绝上面架构我们可以知道以下几个结论

  • consumerQueue和IndexFile文件中保存都是消息再commitLog中的索引,【索引和数据分离】
  • 消息从commitlog → consumerQueue和IndexFile,必然存在一个后台进程不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据
  • 基于磁盘文件的操作还能拥有如此高的吞吐量,rocketMQ 对文件的读写做出了大量优化
    • 利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址(MMAP),减少了文件从用户态复制到应用内存的性能开销。将对文件的操作转化为直接对内存地址进行操作。
    • **页缓存(PageCache)**:部分特殊的内存空间,再文件写入时,先写人pageCache,然后由操作系统的内核线程写入磁盘。

      何时持久化

  1. MQ在接收到一条消息时,需要将消息持久化,否则出现MQ宕机的情况,这条消息也就丢失了
  2. 消费者消费一条消息,需要给这条消息标记一个消费状态并持久化,否则当MQ重启,标记丢失,这个消息会被重复消费。

    持久化到哪儿

    rocketMQ 持久化的文件保存路径需要在配置文件中指定
    配置文件地址:/rocketmq-all-4.3.2-bin-release/conf/
    根据rocketMQ的启动方式,选择异步或者同步配置文件
    #存储路径
    storePathRootDir=/opt/store
    #commitLog 存储路径
    storePathCommitLog=/opt/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/opt/store/consumequeue
    #消息索引存储路径
    storePathIndex=/opt/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/opt/store/checkpoint
    #abort 文件存储路径
    abortFile=/opt/store/abort
    

    刷盘机制

image.png

如上图所示,rocketMQ消息持久化到磁盘,可以分为同步刷盘和异步刷盘;

  • 同步刷盘:每次消息发送到broker,只有再消息真正写入磁盘,并返回成功ACK响应,生产者端才认为消息发送成功,消息可靠性强,吞吐量低
  • 异步刷盘:每次消息发送到broker,只要消息被写入pageCache就会返回一个成功ACK响应。消息可能存在丢失情况,但是吞吐量高。

    消息主从同步

    如果broker是以集群的方式部署,那么必然存在一主多从之前的数据同步【master复制到slave】,消息复制的方式分为同步复制、异步复制
  • 同步复制:等master节点和slave节点都写入成功,才给生产者返回写入成功的状态。数据安全性高,吞吐量低
  • 异步复制:只要master节点写入成功,就给生产者返回写入成功的状态,然后异步从master 复制到 slave。吞吐量高,存在数据丢失的情况。
  • 如何配置:conf目录下的配置文件,配置 brokerRole 参数,参数类型:ASYNC_MASTER【异步】、 SYNC_MASTER【同步】、SLAVE【从节点接受复制】

    负载均衡机制

    rocketMQ 负载均衡分为:Producer端发送消息时候的负载均衡Consumer端订阅消息的负载均衡
  • Producer端发送消息负载均衡:默认情况下,采用的随机递增取模的方式,就是第一次给某个topic发送消息时,会随机获取一个随机数,对consumerQueue的总数取模。然后根据模数从consumerQueue的列表中获取。此后再往该topic发送消息,就再原有的随机数上递增然后再取模。
    rocketMQ还提供一个sendLatencyFaultEnable参数,在原有的递增取模的基础上,过滤掉上次失败的broker【在一定时间内下】,源码位置:MQFaultStrategy.selectOneMessageQueue()
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 开启 sendLatencyFaultEnable 配置
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断是否是之前存在失败的broker,如果是,继续循环选择下一个broker
                    // latencyFaultTolerance 该对象的更新操作在发送完成时更新,前提是打开了sendLatencyFaultEnable配置
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
    
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 未开启sendLatencyFaultEnable的情况下,随即递增取模
            return tpInfo.selectOneMessageQueue();
        }
    
  • Consumer端订阅消息负载均衡:根据消费模式分为广播模式 和 集群模式
    • 广播模式下:首先根据topic获取所有的messageQueue,然后遍历所有messageQueue,并拉取消息
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // 获取topic对应的messageQueue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    // 遍历所有messageQueue,挨个拉取消息
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            /**
             * 省略集群模式 部分代码
             */
    
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;
        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
        // 第一次进入这里必为 空,所以省略
        }
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }
                // 移除偏移量
                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                // 根据 配置 查看 从哪儿开始消费,从第一个、最后一个、某一时间时间戳 开始消费
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        // 构建拉取请求
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
        // 进行拉取消息
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }
    
    • 集群模式:根据配置的负载均衡策略以及当前消费者的ClientId,进行给每个消费者分配messageQueue
      case CLUSTERING: {
           // 获取topic的所有队列
           Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
           // 获取所有消费者
           List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
           ...
           if (mqSet != null && cidAll != null) {
               List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
               mqAll.addAll(mqSet);
               Collections.sort(mqAll);
               Collections.sort(cidAll);
               AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
               List<MessageQueue> allocateResult = null;
               try {
                   // 根据当前消费者的ClientId和规则,进行分配messageQueue
                   allocateResult = strategy.allocate(
                       this.consumerGroup,
                       this.mQClientFactory.getClientId(),
                       mqAll,
                       cidAll);
               } catch (Throwable e) {
                   ...
               }
               Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
               if (allocateResult != null) {
                   allocateResultSet.addAll(allocateResult);
               }
               boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
               ...
           }
           break;
       }
      
    集群负载均衡策略
    • AllocateMachineRoomNearby:将同机房的Consumer和Broker优先分配在一起。
    • AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
    • AllocateMessageQueueAveragelyByCircle:轮询分配。轮流的给一个消费者分配一个MessageQueue
    • AllocateMessageQueueByConfig:不分配,直接指定一个messageQueue列表
    • AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配
    • AllocateMessageQueueConsistentHash:根绝一致性hash算法来分配MessageQueue
      image.png

      消息的重投/重试机制

      生产者-消息重投

      生产者在发送消息时,出现异常导致消息没有正常发送到broker,同步消息存在重投机制、异步消息存在重试机制、单向消息没有消息安全保障。
  • retryTimesWhenSendFailed:同步消息发送失败重投次数,默认2次,生产者会重新投递消息 retryTimesWhenSendFailed+1次,且不会选择上次失败的broker。
  • retryTimesWhenSendAsyncFailed:异步消息失败重试次数,异步重试不会选择别的broker,仅在同一台broker尝试重试。
  • retryAnotherBrokerWhenNotStoreOK:消息持久化超时或者slave同步失败,是否尝试投递其他broker。默认false

    消费者-消息重试

    生产者将消息正常发送带broker之后,消息消费失败,消费者会重新拉起消息重新消费。

RocketMQ 为每个topic设置了”%RETRY%+consumerGroup“的重试队列,且为需要重试的消息设置重试级别【1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h】,重试次数越多,重试级别越高【投递延时就越大】。

原理:消息消费失败时,rocketMQ 会将失败的消息发送到topic为”SCHEDULE_TOPIC_XXXX“的延时队列中,后台定时任务按照对应的时间重新将延时队列中的消息移入对应的topic中。

如果重试18次之后仍然失败,rocketMQ 会将失败的消息发送到死信队列中。

消息的幂等性

在确保消息可靠性时,消息重复是不可避免的,如果业务上对于消息重复比较敏感,那么需要在业务层过滤重复的消息。

这就务必确认消息存在唯一键,可以是msgId,也可以是业务的唯一标识字段,比如订单id。在实际使用上,尽量使用业务上的唯一标识,因为有可能存在msgId不一致,但是消息内容一致的情况,如消息重试。

RocketMQ是否支持高可用?如何实现高可用?

RocketMQ 本身是不支持高可用的,但是可以基于 DLedger 搭建可以自动容灾切换的 RocketMQ 集群。

  1. 下载rocketMQ 4.5 版本以上
  2. 至少准备三个节点
  3. 正常启动nameServer
  4. 根绝配置文化[../conf/dledger]下的配置文件启动broker即可

    消息丢失问题

    哪些情况可能存在消息丢失

    从上文我们大致可以知道消息从发送到消费经历了4个步骤:
  5. 生产者发送消息到mq-master节点的broker
  6. broker主从同步
  7. broker持久化到磁盘
  8. 消费者拉取消息消费

image.png

怎么防止消息丢失

在以上4个步骤中均有可能出现消息丢失的情况,那么如何防止消息丢失也从这4点出发

  1. 生产者使用事务消息保障消息可靠性,可以保障生产者本地事务和发送到broker的消息事务一致性。【1可以避免】

  2. RocketMQ配置同步刷盘+Dledger主从架构保证MQ自身不会丢消息【2、3可以避免】

  3. 消费者同步拉取消息,并在本地业务处理完成后,返回消费成功标识【4可以避免】


文章作者: zhouxh-z
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 zhouxh-z !
  目录