1. RocketMQ特性

  • 广播消息

  • 消息过滤

  • 批量发送

  • 顺序消息

  • 延迟消息

  • 事务消息

  • 消息重试

  • 消息回溯

  • 高可用

  • 消息准实时送达

  • Dashboard

2. 架构

2.1. 名词解释

  • Producer: 发送消息的角色.

  • Consumer: 消费消息的角色, 支持集群模式和广播模式两种消费方式.

  • Broker: Producer和Consumer交互, 负责收发消息, 存储消息.

  • NameServer: 是Broker的注册中心, Broker在启动时向所有的NameServer注册, 生产者Producer和消费者Consumer可以从NameServer中获取所有注册的Broker列表和Topic路由信息.

3. 源码目录结构

SourceCodeDirectory4

4. Topic路由管理

4.3. Broker注册流程

[NameServer] RouteInfoManager

  1. 加写锁.

  2. 将cluster-broker名称加入到 clusterAddrTable 中.

  3. 遍历Broker的地址信息集合, 如果地址一致但是BrokerId不一致则删除旧的信息, 保证同一个地址在map集合中只能有一条数据. 将brokerId-broker地址加入到 brokerAddrTable 中.

  4. 将Broker加入到brokerLiveTable中, 并记录收到注册请求的时间戳, 在进行心跳检测的时候需要根据这个时间戳来判断是否在规定时间内未收到Broker的请求.

  5. 如果是首次注册或者数据发生变更, 则将topic信息保存到 topicQueueTable 中.

  6. 如果发送请求的Broker不是Master, 需要获取其所属的Master地址设置到BrokerLiveInfo中并返回给Broker.

  7. 释放锁.

4.4. Topic路由信息管理

核心类为 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager , 接收到Broker注册然后维护cluster, broker, topic信息.

RouteManagerInfo

4.5. Topic路由删除

4.5.1. 删除触发时机

  • RouteInfoManager#scanNotActiveBroker : NameServer每10秒定时扫描brokerLiveTable, 如果最近一次的心跳包时间在120s之前, 则会剔除该Broker.

  • BrokerController#unregisterBrokerAll : Broker关闭时会向NameServer发送 unregisterBroker[104] 指令下线自己.

4.5.2. Broker删除流程

  1. brokerLiveTable 中删除该Broker节点.

  2. brokerAddrTable 中根据brokerId删除节点.

  3. 如果删除后该brokerName下无其他节点:

    1. 将整个brokerName节点列表删除.

    2. clusterAddrTable 中删除该brokerName. 如果删除后该cluster下无其他brokerName, 则将该cluster删除.

    3. topicQueueTable 中删除该broker下关联的所有topic路由.

4.6. Topic路由发现

查找入口: RouteInfoManager#pickupTopicRouteData : . 从 topicQueueTable 中获取到各个Broker的Topic配置信息. . 从 brokerAddrTable 中获取到各个Broker的节点信息. . 如果NameServer支持顺序消息, 则从kv配置中获取该Topic的 ORDER_TOPIC_CONFIG 配置.

5. 消息发送

5.1. 发送示例代码

Producer.java
public class Producer {
    public static final int MESSAGE_COUNT = 1000;
    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);
                /*
                 * There are different ways to send message, if you don't care about the send result,you can use this way
                 * {@code
                 * producer.sendOneway(msg);
                 * }
                 */

                /*
                 * if you want to get the send result in a asynchronize way, you can use this send method
                 * {@code
                 *
                 *  producer.send(msg, new SendCallback() {
                 *  @Override
                 *  public void onSuccess(SendResult sendResult) {
                 *      // do something
                 *  }
                 *
                 *  @Override
                 *  public void onException(Throwable e) {
                 *      // do something
                 *  }
                 *});
                 *
                 *}
                 */

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        producer.shutdown();
    }
}

5.2. Producer启动流程

入口: DefaultMQProducerImpl#start

  1. 检查ProducerGroup名称是否包含特殊字符.

  2. 如果用户没有设置instanceName, 则设置为 PID#System.nanoTime().

  3. 创建 MQClientInstance , 用于和NameServer和Broker通信, 并put至全局的 MQClientManager 中, key为clientID(IP@InstanceName).

  4. 启动 MQClientInstance :

    1. 启动和NameServer通信的client.

    2. 启动定时任务: 每2分钟获取一次最新的NameServer地址.

    3. 启动 PullMessageService .

    4. 启动 RebalanceService .

    5. 启动 defaultMQProducer .

    6. 向所有的Master Broker发送心跳包, 包含自己的clientID和producerGroupName.

5.3. 消息发送流程

入口: DefaultMQProducerImpl#sendDefaultImpl

  1. 如果发送批量消息, 则会将Collection编码成一条消息.

  2. 校验消息

    1. 校验Topic合法性.

    2. 校验body长度不能为空且不能超过4MB.

  3. 根据Topic查询路由信息:

    1. 从本地 topicPublishInfoTable 查询.

    2. 如果没有则从NameServer查询( RequestCode.GET_ROUTEINFO_BY_TOPIC )并更新到本地 topicPublishInfoTable 中.

    3. 如果还是没有则用默认topic TBW102 查询路由信息.

  4. 如果最终没有Topic路由信息则抛出异常 No route info of this topic .

  5. 根据topic和brokerName选择一个MessageQueue(轮训), 如果MessageQueue为空则不会发送消息.

  6. 根据brokerName找到Master节点的ip地址.

  7. 如果消息体大于4KB则会做zlib压缩并打 0x1 tag.

  8. 如果属于事务消息则会打 0x100 tag.

  9. 如果存在Hook, 则会执行 sendMessageBefore 方法.

  10. 设置消息header.

  11. 发送消息.

  12. 如果是异步发送, 待接口返回后回调sendCallback.

  13. 如果消息发送失败, 则会重试(同步发送重试三次, 其他不会), 每次重试会规避上次发送失败的Broker. 如果开启 sendLatencyFaultEnable , 则根据latency会在一段时间内不会向该Broker发送消息.

6. 消息存储

消息储存入口: org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage

6.1. 文件结构

  • commitlog: 存放所有主题消息的目录, 文件名为起始偏移地址, 顺序写满单文件1G后继续写写到下一个文件, 对应代码中MappedFile对象.

  • config:

    • consumerFilter.json: 消费者Topic过滤信息.

    • consumerOffset.json: 消费者消费偏移量进度数据.

    • delayOffset.json: 延迟队列进度数据.

    • subscriptionGroup.json: 消费者组的配置信息.

    • topics.json: 主题配置信息.

  • consumequeue: 为方便消费者按Topic-Queue-Tag消费消息, 按每个Topic/QueueId新建目录, 在每个目录下存储消息在CommitLog中的偏移地址(8字节), 消息大小(4字节)和tag的哈希码(8字节), 单文件30w*20KB=5.72MB.

  • index: 存储消息索引数据的文件目录, 存储: key哈希码, 消息在commitlog中偏移量, 与第一条消息的时间错差值, 前一个索引编号.

  • checkpoint: 存储CommitLog, ConsumeQueue, Index文件各自最近一次刷盘的时间戳.

  • lock: 文件锁.

  • abort: 如果存在abort文件, 说明Broker异常退出.

6.2. 刷盘策略

  • 同步刷盘: Broker收到消息后, 写入内存再提交刷盘任务, 将待提交的数据(write到flush指针范围)批量写到硬盘再返回.

  • 异步刷盘: Broker收到消息后, 写入内存后直接返回, 另起一个500ms的定时线程将 pagecache 写入硬盘.

6.3. transientStorePoolEnable机制

为了缓解pagecache在高并发场景下的读写压力, RocketMQ可以开启 transientStorePoolEnable=true . Broker收到消息后写入到堆外内存后直接返回, 然后异步将堆外内存的增量数据同步到pagecache, 最后异步写入到硬盘中. 如果进程异常退出, pagecache中的数据会写入到硬盘不会丢失, 但是堆外内存的数据会丢失.

6.4. 消息存储格式

入口: org.apache.rocketmq.store.CommitLog.MessageExtEncoder#encodeorg.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend

6.5. 索引文件同步

reputService线程对比发现有新消息写入到commitlog文件后, 会从commitlog中读取读取数据写入到consumequeue和index文件中, 然后更新checkpoint.

  • reputService线程工作入口: org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput .

  • consumequeue保存入口(异步刷盘): org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch .

  • index保存入口(写满文件才会主动刷盘): org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch .

  • checkpoint保存入口: org.apache.rocketmq.store.StoreCheckpoint#flush .

6.6. 过期文件删除机制

RocketMQ每10秒钟执行一次commitlog/consumequeue文件过期检查定时任务, 符合以下条件时会从倒数第二个文件扫描到第一个文件, 扫描出 fileReservedTime(默认72小时) 时间内未写入过的文件并删除:

  • 当前时间为凌晨4点.

  • 磁盘分区使用率达到85%.

  • 人工提交删除任务.

7. 消息消费

7.1. push消费者启动流程

  1. 启动Consumer, 保存订阅关系到 subscriptionInner , 并从 pullRequestQueue 阻塞队列拉取消费请求.

  2. 启动 ConsumeMessageConcurrentlyService , 每15分钟将未消费完的消息返还给Broker.

  3. 如果是广播消费模式, 则从本地加载offset.

  4. 启动 MQClientInstance .

    1. 建立和Broker的链接.

    2. 启动一个定时任务, 每2分钟重新获取一次NameServer的地址.

    3. 启动 PullMessageService .

    4. 启动 RebalanceServiceImpl : 每20秒对每个消费的Topic获取消费队列, 如果和上次不同, 则会停止未分配给自己的queue和消费新增的queue, 然后立即转成 PullRequest 对象放到 pullRequestQueue 中让consumer开始消费, 消费完后继续将 PullRequest 对象放到队列里.

    5. 启动 DefaultMQProducerImpl .

  5. 立即执行一次rebalance.

7.2. 消息消费流程

消费入口: org.apache.rocketmq.client.impl.consumer.PullMessageService#run

  1. pullRequestQueue 阻塞队列拉取消费请求.

  2. 根据consumerGroup名称找到 DefaultMQPushConsumerImpl 处理对象.

  3. 判断如果queue被dropped, 则说明该队列被rebalance到别的消费者里了, 立即返回.

  4. 如果队列中待处理的消息大于1000条或者大于100MB, 则延迟50ms后再消费.

  5. 如果是并发消费模式, 且offset最小最大之差大于2000, 则延迟50ms后再消费.

  6. 根据Broker名称找到节点ip地址, 拉取消息 org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage .

  7. 进入拉取消息成功的回调: org.apache.rocketmq.client.consumer.PullCallback#onSuccess .

  8. 更新下一次PullRequest的offset.

  9. 如果没有拉取到消息列表, 则重复执行一次PullRequest任务.

  10. 将消息列表放入TreeMap中按offset排好序.

  11. 将消息列表按consumeMessageBatchMaxSize(默认为1)分区, 提交到消费线程池中.

  12. 执行消费前置hook: org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#executeHookBefore .

  13. 执行应用消费逻辑: org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage .

  14. 执行消费后置hook: org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#executeHookAfter .

  15. 从processQueue中删除已消费的消息列表.

  16. 如果消费失败: 广播模式下只打印日志, 集群模式下将消息重新发送到Broker延迟队列中, 如果发送失败则将消息在本地5秒后重新消费.

  17. 如果消费成功: 更新offsetTable. 如果processQueue中不存在其他待消费的消息, 则提交最大的offset, 否则提交最小的offset.

  18. 定时任务每10秒将offsetTable数据持久化, 广播模式下持久化到本地 .rocketmq_offsets 文件中, 集群模式下提交到Broker保存.

  19. 将PullRequest重新放入队列中重复执行下一次拉取消息任务.

7.3. 消费者负载均衡机制

消费者通过指定 org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy 来控制负载均衡机制.

7.4. 延迟消息消费流程

  1. RocketMQ支持 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 18个延迟级别, 每个延迟级别对应一个 SCHEDULE_TOPIC_XXXX 下的队列, 和一个定时任务.

  2. 延迟消息发送到Broker中后, 将原topic, tag, queueId设置到properties中, 然后根据延迟级别保存到 SCHEDULE_TOPIC_XXXX topic中.

  3. Broker中的定时任务会扫描queue中所有的消息, 判断当前时间和消息存储时间的差值, 如果等于延迟时间则将消息重新保存到原Topic的queue中等待消费者拉取消费.

7.5. 消息过滤机制

RocketMQ支持表达式过滤(可以细分为tag和sql92过滤方式)和类过滤方式.

  • tag: Producer指定发送消息的tag, 消费者根据tag过滤表达式拉取消息, 多个tag用 | 分隔, 通配用 * .

  • sql92: Producer指定发送消息的property, 消费者设置sql表达式过滤, 如 xxx is not null and xxx > 0 .

  • classFilter: 部署filterServer, 在其中实现 MessageFilter 接口. 消费者指定 MessageFilter 全限定名, Broker会远程调用filterServer判断是否消费.

8. 源码调试

  1. 下载源码: git clone https://github.com/apache/rocketmq.git --branch 4.9.x --single-branch

  2. 创建RocketMQ目录及配置:

    rocketmq_debug_dir
  3. 配置文件均从源码distribution目录拷贝, broker.conf加以下配置:

    broker.conf
    storePathRootDir=/path/to/rocketmq/broker/store
    namesrvAddr=localhost:9876
    autoCreateTopicEnable=false
  4. 设置环境变量 ROCKETMQ_HOME=/path/to/rocketmq/nameserver .

  5. 启动NameServer.

  6. 添加Broker启动参数 -c /path/to/rocketmq/broker/conf/broker.conf 并启动.

  7. 注释掉 org.apache.rocketmq.example.quickstart.Producer producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); 这行代码并启动.

  8. 注释掉 org.apache.rocketmq.example.quickstart.Consumer consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR); 这行代码并启动.