很多人用 RocketMQ 消费消息,第一眼看到的代码都差不多:

consumer.registerMessageListener((msgs, context) -> {
    // 处理业务逻辑
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

业务代码看起来很简单:注册一个 Listener,消息来了,RocketMQ 回调一下,处理成功就返回 CONSUME_SUCCESS

但如果只停留在这个层面,很容易把 RocketMQ 的消费过程想得太简单。真正跑起来的时候,客户端背后做了不少事情:重平衡、拉取消息、本地缓存、消费线程池投递、消费结果处理、offset 更新、失败重试。

今天我们就顺着源码,把 RocketMQ 消费消息这条链路完整串一下。

消费不是一上来就拉消息,而是先分队列

RocketMQ 的消费是围绕 MessageQueue 展开的。

一个 Topic 会被拆成多个队列。一个消费组里可能有多个消费者实例,这些消费者不能都去消费同一个队列,否则消息就乱了。所以 RocketMQ 客户端启动以后,第一件重要的事就是做 Rebalance,也就是把 Topic 下的 MessageQueue 分给当前消费组里的不同客户端。

重平衡可以理解成一次“重新分工”。

比如一个 Topic 有 8 个 MessageQueue,消费组里原来只有 2 台机器,那大概就是一台机器消费 4 个队列。后来你扩容到 4 台机器,原来的分工就不合适了,RocketMQ 需要重新算一遍,让每台机器大概消费 2 个队列。

反过来也一样。如果某台消费者下线了,它原来负责的队列不能没人消费,其他还活着的消费者也要重新分配这些队列。

所以 Rebalance 主要发生在这些场景里:

  • 消费者启动
  • 消费者下线
  • 消费组内实例数量变化
  • Topic 的 MessageQueue 信息变化
  • 订阅关系变化
  • 客户端定时触发重平衡检查

RebalanceImpl.doRebalance() 里,RocketMQ 会遍历当前消费者订阅的 Topic,然后逐个 Topic 做重平衡:

public boolean doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            this.rebalanceByTopic(topic, isOrder);
        }
    }
}

这里还要注意一点:广播消费和集群消费的分配逻辑不一样。

广播消费很简单。既然要求组内每个消费者都收到一份消息,那每个消费者都要消费这个 Topic 下的全部队列。

集群消费就要真的“分队列”了。客户端会拿到 Topic 下所有 MessageQueue,再拿到当前消费组下所有消费者实例 cidAll,然后根据分配策略算出当前客户端应该负责哪些队列。

RebalanceImpl 里,可以看到它会根据分配策略算出当前客户端应该负责哪些队列:

allocateResult = strategy.allocate(
    this.consumerGroup,
    this.mQClientFactory.getClientId(),
    mqAll,
    cidAll);

这个 strategy 就是队列分配策略。默认常见的是平均分配,目的不是让每台机器都消费所有消息,而是让一个消费组内的多个消费者一起分摊队列。

分配结果算出来以后,还不能直接开始拉消息。RocketMQ 还要对比“新分配结果”和“当前本地正在处理的队列”。

如果某个队列已经不归当前消费者了,就要把它对应的 ProcessQueue 标记为 dropped,然后持久化 offset,最后从本地表里移除:

if (!mqSet.contains(mq)) {
    pq.setDropped(true);
    removeQueueMap.put(mq, pq);
}

这个 dropped 很关键。因为消费线程池里可能还有旧的消费任务正在跑,标记 dropped 后,后续这些任务就知道这个队列已经不应该继续被当前消费者处理了。

如果某个队列是新分配给当前消费者的,RocketMQ 才会为它创建新的 ProcessQueue,计算起始 offset,然后生成 PullRequest

算完以后,RocketMQ 会为新分配到的队列创建 ProcessQueue,再生成 PullRequest

ProcessQueue pq = createProcessQueue();
long nextOffset = this.computePullFromWhere(mq);

PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);

如果是顺序消费,还会多一步:先尝试锁住这个 MessageQueue。只有锁成功了,当前消费者才会创建 ProcessQueue 并开始拉消息。

因为顺序消费要保证同一个队列同一时间只能被一个消费者处理。如果队列刚从 A 机器迁移到 B 机器,而 A 机器还没完全停下来,B 机器又马上开始消费,就可能破坏队列内顺序。所以顺序消费的 Rebalance 会更谨慎。

整个重平衡过程可以画成这样:

flowchart TD
    A([触发 Rebalance]) --> B[遍历订阅的 Topic]
    B --> C{消费模式}
    C -- 广播消费 --> D[当前消费者拿到全部 MessageQueue]
    C -- 集群消费 --> E[获取 Topic 全部 MessageQueue]
    E --> F[获取消费组内全部 ConsumerId]
    F --> G[按分配策略计算当前客户端队列]
    D --> H[对比本地 ProcessQueueTable]
    G --> H
    H --> I{队列是否仍归当前消费者?}
    I -- 不再属于当前消费者 --> J[标记 ProcessQueue dropped]
    J --> K[持久化 offset 并移除本地队列]
    I -- 新分配给当前消费者 --> L[创建 ProcessQueue]
    L --> M[计算 nextOffset]
    M --> N[生成 PullRequest]
    N --> O[交给 PullMessageService 调度]

    classDef start fill:#e8f3ff,stroke:#3b82f6,stroke-width:1.5px,color:#0f172a;
    classDef decide fill:#fff7ed,stroke:#f97316,color:#0f172a;
    classDef local fill:#f8fafc,stroke:#64748b,color:#0f172a;
    classDef action fill:#ecfdf5,stroke:#22c55e,color:#0f172a;
    class A start;
    class C,I decide;
    class B,D,E,F,G,H,J,K,L,M,N,O local;

这一步非常关键。

MessageQueue 是 Broker 上的队列,ProcessQueue 是客户端本地维护的消费快照。后面拉到的消息、正在消费的消息、消费完成后 offset 推进到哪里,都要靠它来记录。

PullRequest 可以理解成一张“下一次拉消息的任务单”。它记录了当前消费组要从哪个 MessageQueue 拉、从哪个 nextOffset 开始拉,以及拉到消息后应该放进哪个 ProcessQueue

后面 PullMessageService 会拿着这个 PullRequest 去调用 pullMessage()。拉到消息以后,RocketMQ 会更新它的 nextOffset,然后把同一个 PullRequest 再次丢回调度流程里,形成持续拉取:

case FOUND:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    processQueue.putMessage(pullResult.getMsgFoundList());
    consumeMessageService.submitConsumeRequest(...);
    executePullRequestImmediately(pullRequest);
    break;

也就是说,PullRequest 不是一次性对象。它会随着消费进度不断更新 nextOffset,然后被反复调度,推动客户端持续从 Broker 拉消息。

简单说,Rebalance 解决的是一个问题:当前消费者到底应该消费哪些队列。

队列分好了,才轮到拉消息。

消息到底是推,还是拉?

这块确实容易绕,因为 RocketMQ 的名字和底层实现不是一个维度。

先把概念拆开。

如果是真正的 Push 模型,应该是 Broker 掌握主动权:Broker 发现有消息,就主动把消息推给 Consumer。Consumer 更像一个被动接收方。

如果是真正的 Pull 模型,主动权在 Consumer 手里:Consumer 觉得自己可以继续消费了,就主动向 Broker 要消息。Broker 只是响应这次请求。

RocketMQ 的 DefaultMQPushConsumer 容易让人误会,因为名字里有 Push。但源码里,Broker 并不会无缘无故主动把消息塞给消费者。真正发起请求的是客户端。

客户端里有一个专门的线程叫 PullMessageService,它会不断从本地队列里取出 PullRequest,然后交给对应消费者去 Broker 拉消息:

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    }
}

所以从网络通信方向看,RocketMQ 的 PushConsumer 仍然是 Consumer 主动请求 Broker。

那为什么它又叫 PushConsumer?

因为它把“拉消息”这件事藏在客户端内部了。业务代码不用自己写循环,也不用自己管理每次从哪个 offset 拉。你只需要注册一个 MessageListener,消息拉回来以后,RocketMQ 会自动回调你的业务逻辑。

也就是说,RocketMQ 的 PushConsumer 可以这样理解:

  • 底层通信:Consumer 主动 Pull
  • 使用体验:框架自动回调 Listener,看起来像 Push

所以它不是“Broker 真推”,而是“客户端自动拉,拉到以后再推给业务 Listener”。

还有一个细节也很重要:RocketMQ 不是那种简单粗暴的短轮询。

短轮询是 Consumer 问 Broker:“有消息吗?”如果没有,Broker 立刻返回空结果。Consumer 过一会儿再问一次。这种方式会产生很多空请求。

RocketMQ 的 PushConsumer 更接近长轮询 Pull。客户端发起拉取请求,如果 Broker 暂时没有新消息,请求可以在 Broker 端挂起一小段时间;这段时间里如果有新消息到了,Broker 就尽快返回;如果超时还没有,再返回没有新消息。

所以它的感觉会更像推送:消息一到,Consumer 很快就被回调。

但底层主动权还是在 Consumer,因为是 Consumer 先发起了这次拉取请求。

可以用一个生活里的例子理解。

普通 Pull 像你每隔 5 分钟去前台问一次:“我的快递到了吗?”

真正 Push 像快递员主动敲门:“你的快递到了。”

RocketMQ 的 PushConsumer 更像你提前跟前台说:“我在这等一会儿,如果快递到了马上告诉我;如果一段时间还没到,你也告诉我一声。”然后你再继续发起下一次等待。

这就是为什么它用起来像 Push,但源码里是 Pull。

再看消费循环就更清楚了。

当一次拉取有消息时,RocketMQ 会把消息放进 ProcessQueue,提交给消费线程池,然后马上安排下一次拉取;如果没有新消息,也会继续安排下一次拉取:

case FOUND:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    processQueue.putMessage(pullResult.getMsgFoundList());
    consumeMessageService.submitConsumeRequest(...);
    executePullRequestImmediately(pullRequest);
    break;

case NO_NEW_MSG:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    executePullRequestImmediately(pullRequest);
    break;

所以 PushConsumer 的核心不是 Broker 推,而是客户端维护了一条持续运转的拉取链路。

一句话说清楚:RocketMQ 是 Pull 内核,Push 使用体验。Consumer 主动从 Broker 拉消息,拉到以后由客户端框架“推”给业务 Listener。

这不是文字游戏,而是一个很重要的设计点。因为拉取节奏掌握在消费者手里,客户端可以根据自己的消费能力做流控,不至于被 Broker 一股脑推爆。

拉消息之前,RocketMQ 会先看消费者扛不扛得住

DefaultMQPushConsumerImpl.pullMessage() 并不是拿到 PullRequest 就直接请求 Broker。

它会先检查当前消费状态,比如消费者是否暂停、本地缓存是否过多、顺序消费时队列锁是否拿到。

其中最典型的是本地缓存流控:

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
    return;
}

这段逻辑很好理解:如果当前 ProcessQueue 里堆了太多消息,就先别继续拉了,延迟一会儿再试。

RocketMQ 还会判断缓存消息大小、消息 offset 跨度等指标。目的都是一样的:别只顾着从 Broker 拉,得看看消费者本地有没有能力消化。

很多线上消息堆积问题,其实就卡在这里。不是 Broker 没消息,也不是消费者没启动,而是客户端发现自己本地已经堆太多了,于是主动放慢拉取节奏。

消息拉回来后,先进入 ProcessQueue

真正请求 Broker 的地方,是 pullAPIWrapper.pullKernelImpl(...)

这一步是异步拉取。Broker 返回结果以后,会进入 PullCallback.onSuccess(),如果状态是 FOUND,说明这次真的拉到了消息。

接着 RocketMQ 会做两件事。

第一,把消息放进 ProcessQueue

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

第二,把消息提交给消费服务:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

这里的 ProcessQueue 很值得多看一眼。

它内部用 TreeMap<Long, MessageExt> 保存消息,key 是消息在队列里的 offset:

for (MessageExt msg : msgs) {
    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
    if (null == old) {
        validMsgCnt++;
        this.queueOffsetMax = msg.getQueueOffset();
    }
}

为什么要用 offset 做 key?

因为消费完成以后,RocketMQ 需要知道这个队列现在最小的未消费 offset 是多少。只有这样,才能正确更新消费进度。

换句话说,ProcessQueue 不只是一个临时缓存,它还承担了消费进度计算的职责。

并发消费:消息被丢进线程池,再回调业务 Listener

如果是普通并发消费,处理类是 ConsumeMessageConcurrentlyService

它收到一批消息后,会按照 consumeMessageBatchMaxSize 拆批,然后封装成 ConsumeRequest 提交到消费线程池:

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);

真正调用业务代码的地方,在 ConsumeRequest.run() 里:

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

这就是我们平时写的 MessageListenerConcurrently 被调用的位置。

这里有个细节:传给业务 Listener 的消息列表是 Collections.unmodifiableList(msgs)。也就是说,RocketMQ 不希望业务代码随便改这批消息列表本身。

用户 Listener 返回的结果,一般就是两个:

CONSUME_SUCCESS:消费成功
RECONSUME_LATER:稍后重试

如果业务代码抛异常,或者返回 null,RocketMQ 会把它当成消费失败处理。

两种消费模式:集群消费和广播消费

说到消费结果处理之前,得先把 RocketMQ 的两种消费模式讲清楚:CLUSTERINGBROADCASTING

这两个模式解决的是同一个问题:一个消费组里有多个消费者时,一条消息到底应该被谁消费。

先看默认也是最常用的集群消费。

集群消费的语义是:同一个消费组内,一条消息只会被其中一个消费者实例消费。

比如一个消费组 order-consumer-group 里有 3 台机器,Topic 下有多条 MessageQueue。RocketMQ 会通过 Rebalance 把这些队列分给不同机器。某条消息落在某个队列里,那么它最终只会被负责这个队列的那台消费者处理。

用大白话说就是:大家组成一个小组,一起分摊这批消息。

这种模式最适合普通业务消费,比如订单创建、库存扣减、发送短信、积分发放。因为这些业务通常只希望一条消息被处理一次,不希望同一个消费组里的每台机器都执行一遍。

集群消费还有一个特点:失败消息会进入重试流程。

如果业务 Listener 返回 RECONSUME_LATER,或者消费过程中抛异常,RocketMQ 会把失败消息送回 Broker,后续再投递给这个消费组。超过最大重试次数以后,消息会进入死信队列。

广播消费就不一样了。

广播消费的语义是:同一个消费组内,每个消费者实例都会收到这条消息。

还是刚才那个例子,如果 order-consumer-group 里有 3 台机器,那么同一条消息会被这 3 台机器各消费一次。

用大白话说就是:不是分工干活,而是组内每个人都通知一遍。

这种模式更适合通知类、刷新类场景,比如本地缓存刷新、配置变更通知、某些需要每个节点都感知的事件。

但广播消费有个很重要的限制:消费失败不会像集群消费那样走 Broker 重试。

在并发消费结果处理里,RocketMQ 对广播模式的处理很直接:

case BROADCASTING:
    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
        MessageExt msg = consumeRequest.getMsgs().get(i);
        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
    }
    break;

也就是说,广播模式下某个消费者处理失败了,这条消息对这个消费者来说基本就丢了,不会被 Broker 重新投递。

为什么要这么设计?

因为广播消费的目标是“每个客户端都收到一份”,消费进度也更偏客户端本地语义。如果失败后还统一走 Broker 重试,很容易让广播语义变复杂:到底给哪台机器重试、重试几次、某台机器下线了怎么办,都会变得麻烦。

所以选消费模式的时候,可以记住这个判断:

再强调一下,RocketMQ 默认是集群消费。线上业务里如果没有非常明确的“每个实例都必须收到”的需求,一般不要轻易改成广播消费。

消费成功以后,RocketMQ 提交的是 offset

并发消费完成后,会进入 processConsumeResult()

如果消费成功,RocketMQ 会统计成功数量;如果消费失败,会统计失败数量,并根据消费模式决定后续动作。

广播模式下,消费失败会直接丢弃:

case BROADCASTING:
    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
        MessageExt msg = consumeRequest.getMsgs().get(i);
        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
    }
    break;

集群模式下,失败消息会被送回 Broker,等待后续重试:

case CLUSTERING:
    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
        MessageExt msg = consumeRequest.getMsgs().get(i);
        boolean result = this.sendMessageBack(msg, context);
        if (!result) {
            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
            msgBackFailed.add(msg);
        }
    }
    break;

处理完成功和失败的消息以后,RocketMQ 会从 ProcessQueue 里移除已经处理过的消息,然后更新 offset:

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

这里要注意一个点:RocketMQ 提交的不是“某条消息成功了”,而是“这个队列消费到哪个 offset 了”。

这也是为什么消费逻辑里经常会强调幂等。只要 offset 还没稳定提交,或者消费结果处理过程中发生异常,消息就有可能被再次投递。

RocketMQ 保证的是至少一次投递,不保证业务天然只执行一次。幂等要由业务自己兜住。

消费失败以后,消息怎么重试?

并发消费失败时,集群模式下会调用 sendMessageBack()

从业务视角看,就是这条消息没有消费成功,后面还会再来。

从 Broker 视角看,失败消息会进入重试流程。RocketMQ 会根据消费组构造重试 Topic,后续再把消息投递给这个消费组。

如果发送回 Broker 也失败了,RocketMQ 不会马上放弃,而是把这批消息重新提交到本地消费线程池,延迟 5 秒后再消费:

if (!msgBackFailed.isEmpty()) {
    consumeRequest.getMsgs().removeAll(msgBackFailed);
    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

所以消费失败并不是简单地“返回失败就结束”。RocketMQ 背后会尽量把消息重新送回重试链路。

但重试也不是无限温柔的。超过最大重试次数以后,消息会进入死信队列,留给人工排查或补偿处理。

顺序消费:重点不是线程池,而是队列锁

并发消费追求吞吐,顺序消费追求同一个队列内的消息顺序。

RocketMQ 的顺序消费不是全局有序,而是 MessageQueue 级别有序。也就是说,同一个队列里的消息按顺序消费,不同队列之间仍然可以并行。

ConsumeMessageOrderlyService 里,消费前会先拿到当前 MessageQueue 对应的锁对象:

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
        // takeMessages and consume
    }
}

集群顺序消费还要依赖 Broker 端队列锁。只有当前消费者拿到了这个队列的锁,才允许继续消费。

这就解释了一个常见问题:为什么顺序消费吞吐会比并发消费低?

因为它用锁和队列粒度限制换来了顺序性。同一个 MessageQueue 不能随便并发消费,否则顺序就没了。

所以如果业务要求“同一个订单的消息必须有序”,生产端就要把同一个订单号路由到同一个 MessageQueue。RocketMQ 保证的是队列内有序,不是整个 Topic 全局有序。

消费链路串起来,其实就这一条线

看到这里,RocketMQ 消费消息的整体链路就比较清楚了:

flowchart TD
    A([Consumer 启动]) --> B[Rebalance 分配 MessageQueue]
    B --> C[创建 ProcessQueue]
    C --> D[生成 PullRequest]

    D --> E[PullMessageService 调度拉取请求]
    E --> F[DefaultMQPushConsumerImpl 异步拉 Broker]
    F --> G{Broker 是否返回消息?}

    G -- 有消息 --> H[写入本地 ProcessQueue]
    H --> I[ConsumeMessageService 提交消费任务]
    I --> J[消费线程池执行 ConsumeRequest]
    J --> K[回调业务 MessageListener]

    K --> L{消费结果}
    L -- 成功 --> M[从 ProcessQueue 移除已消费消息]
    L -- 失败 --> N[sendMessageBack 进入重试链路]
    N --> M
    M --> O[OffsetStore 更新消费 offset]

    G -- 暂无消息 --> P[更新 nextOffset]
    P --> Q[再次提交 PullRequest]
    O --> Q
    Q --> E

    classDef start fill:#e8f3ff,stroke:#3b82f6,stroke-width:1.5px,color:#0f172a;
    classDef local fill:#f8fafc,stroke:#64748b,color:#0f172a;
    classDef broker fill:#fff7ed,stroke:#f97316,color:#0f172a;
    classDef consume fill:#ecfdf5,stroke:#22c55e,color:#0f172a;
    classDef result fill:#fef2f2,stroke:#ef4444,color:#0f172a;

    class A start;
    class B,C,D,E,H,M,O,P,Q local;
    class F,G broker;
    class I,J,K consume;
    class L,N result;

这张图里有两个循环。

第一个循环是拉取循环:一次拉完以后,不管有没有消息,PullRequest 都会继续被调度起来,只是中间可能因为流控、暂停、异常等原因延迟一会儿。

第二个循环是失败重试:业务消费失败后,集群模式下消息会被送回 Broker,后面再通过重试 Topic 投递回来。

这条链路里有几个特别重要的关键词:

  • Rebalance:决定当前消费者消费哪些队列。
  • PullMessageService:负责调度拉消息。
  • ProcessQueue:客户端本地消费快照和消息缓存。
  • ConsumeMessageService:负责把消息投递给消费线程池。
  • OffsetStore:记录消费进度。
  • Retry Topic / DLQ:处理失败重试和死信消息。

最后总结一下

RocketMQ 消费消息不是简单地“Broker 发消息,Consumer 收消息”。

它更像是一套客户端驱动的消费流水线:先通过 Rebalance 拿到自己负责的队列,再主动从 Broker 拉消息,拉回来先进入本地 ProcessQueue,然后交给消费线程池回调业务 Listener,最后根据消费结果更新 offset 或进入重试。

理解这条链路以后,很多问题就能顺着源码定位了。

消息堆积时,先看是 Broker 端堆积,还是客户端 ProcessQueue 堆积;消费慢时,看消费线程池和业务耗时;重复消费时,看 offset 提交和业务幂等;顺序消费卡住时,看队列锁和单队列消费耗时。

RocketMQ 消费的核心,不是一个 Listener,而是 Listener 背后的整套拉取、缓存、投递、确认和重试机制。