一、先搞清楚:顺序消费到底顺序在哪里

很多人一提 RocketMQ 顺序消费,第一反应是“消息按顺序消费”。这句话没错,但不够准确。

更准确地说:RocketMQ 的顺序消费,是以 MessageQueue 为单位保证顺序。只要同一类业务消息被发到同一个队列里,消费端就会尽量保证这条队列同一时间只被一个线程、一个消费者实例顺序处理。

1.1 它不是全局有序,而是局部有序

那什么叫“顺序”?

别把它想复杂了。顺序消费解决的是这样一类问题:同一个业务对象的多条消息,消费时不能乱。

比如一个订单,正常状态流转可能是:

创建订单 -> 支付成功 -> 商家发货 -> 用户确认收货 -> 订单完成

如果消费端先处理了“订单完成”,后处理“支付成功”,业务状态肯定就乱了。这里我们真正关心的不是所有订单之间的全局顺序,而是同一个订单自己的消息顺序

所以 RocketMQ 的顺序消费更像是“局部有序”:同一个业务 key 的消息有序,不同业务 key 之间可以并行。

1.2 哪些场景适合用顺序消费

这也是为什么顺序消费经常出现在这些场景里:

  • 订单状态流转,比如下单、支付、发货、确认收货。
  • 交易流水处理,比如同一个账户的扣款、入账、冲正。
  • 库存变更,比如同一个商品 SKU 的扣减、回滚、补偿。
  • 数据同步,比如数据库 binlog、缓存更新、搜索索引更新。
  • 设备事件上报,比如同一台设备的上线、告警、恢复、离线。
  • 工作流推进,比如审批流、任务状态机、业务编排事件。

这些场景有个共同点:它们通常不是要求整个 Topic 全局有序,而是要求同一个业务实体内部有序。

1.3 顺序消费不要滥用

当然,顺序消费不是免费的。它为了保证顺序,会牺牲一部分并发能力。一条队列同一时间只能被一个线程消费,如果某条消息一直失败,它后面的消息也会被堵住。所以不是所有消息都应该上顺序消费。

像日志采集、埋点上报、通知推送、普通异步解耦这类场景,如果业务不关心严格先后顺序,用并发消费通常更合适,吞吐也更高。

说白了,顺序消费要解决的是“乱序会不会让业务出错”。如果会,就用;如果只是看起来有顺序更舒服,那大概率没必要。

二、生产端:先把同一业务 key 发到同一队列

生产端这边其实很简单,RocketMQ 把选择队列的能力交给了用户:

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

比如订单消息,一般会用 orderId 对队列数量取模。这样同一个订单的创建、支付、发货、完成,就会落到同一个 MessageQueue 里。

但这只是第一步。

三、消费端:三层锁把队列消费稳住

真正有意思的地方在消费端:RocketMQ 怎么保证这一条队列不会被多个线程乱序消费?

3.1 顺序消费入口:MessageListenerOrderly

入口在 DefaultMQPushConsumerImpl。只要用户注册的是 MessageListenerOrderly,客户端就会创建 ConsumeMessageOrderlyService

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}

这个类名已经把事情说得很直白了:顺序消费的核心逻辑,就在 ConsumeMessageOrderlyService 里。

3.2 第一层锁:Broker 侧队列锁

它启动时会做一件很关键的事:如果是集群消费模式,会定时去 Broker 抢队列锁。

public void start() {
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                }
            }
        }, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}

为什么要抢锁?

因为在集群消费下,同一个 Consumer Group 里可能有多个消费者实例。Rebalance 之后,某条 MessageQueue 理论上只会分配给一个消费者,但网络抖动、重平衡切换、客户端异常退出这些情况都可能出现。RocketMQ 不能只靠“分配关系”来赌顺序,所以它又加了一层 Broker 侧队列锁。

RebalanceImpl.lockAll() 会把当前客户端持有的队列按 Broker 分组,然后向 Broker 发起批量加锁。

Set<MessageQueue> lockOKMQSet =
    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

for (MessageQueue mq : mqs) {
    ProcessQueue processQueue = this.processQueueTable.get(mq);
    if (processQueue != null) {
        if (lockOKMQSet.contains(mq)) {
            processQueue.setLocked(true);
            processQueue.setLastLockTimestamp(System.currentTimeMillis());
        } else {
            processQueue.setLocked(false);
        }
    }
}

看到这里,顺序消费的第一层保护就出来了:

集群模式下,先确保某条 MessageQueue 在同一时刻只被一个消费者实例持有。

3.3 第二层锁:客户端本地 MessageQueueLock

但这还不够。

一个消费者实例内部也有消费线程池。如果同一条队列里的消息被多个线程同时拿走,还是会乱。所以 RocketMQ 又做了第二层保护:本地队列锁。

/**
 * Message lock,strictly ensure the single queue only one thread at a time consuming
 */
public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, ConcurrentMap<Integer, Object>> mqLockTable =
        new ConcurrentHashMap<>(32);
}

真正执行消费任务时,ConsumeRequest.run() 会先根据 MessageQueue 拿到一个本地锁对象,然后 synchronized 住。

final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
        || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
        // consume messages
    }
}

这段代码很关键。

广播模式下,每个消费者都会消费一份消息,所以不需要 Broker 侧队列锁;但在本地仍然要保证一个队列一次只被一个线程消费。

集群模式下,必须满足两个条件:

  • processQueue.isLocked():这条队列已经被当前消费者实例锁住。
  • !processQueue.isLockExpired():锁还没过期。

如果队列没锁住,或者锁过期了,RocketMQ 不会硬着头皮消费,而是稍后重新尝试加锁。

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
    && !this.processQueue.isLocked()) {
    log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
    break;
}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
    && this.processQueue.isLockExpired()) {
    log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
    break;
}

看到这里还没完。

3.4 第三层锁:ProcessQueue.consumeLock

如果继续往下看源码,会发现消费真正调用业务监听器之前,还会拿一把 ProcessQueue 上的读锁:

try {
    this.processQueue.getConsumeLock().readLock().lock();
    if (this.processQueue.isDropped()) {
        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
            this.messageQueue);
        break;
    }

    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} finally {
    this.processQueue.getConsumeLock().readLock().unlock();
}

这把锁定义在 ProcessQueue 里:

private final ReadWriteLock consumeLock = new ReentrantReadWriteLock();

刚开始看这里,很容易以为“顺序消费原来还有第三把消费锁”。这么理解也没问题,但要说清它到底在防什么。

前面的 MessageQueueLock 已经能保证同一个客户端里,一条 MessageQueue 不会被多个消费线程同时执行。那 ProcessQueue.consumeLock 为什么还要存在?

答案在 Rebalance 移除队列的时候。

当某个 MessageQueue 不再属于当前消费者,RocketMQ 不能直接把它解锁、移除 offset,然后让别的消费者马上接手。因为当前消费者可能正在执行用户的 consumeMessage()。如果这时候强行释放队列,另一个消费者拿到锁后开始消费同一条队列,就有机会和老消费者重叠,顺序语义就危险了。

所以移除顺序消费队列时,RebalancePushImpl 会尝试拿 consumeLock 的写锁:

if (forceUnlock || pq.getConsumeLock().writeLock().tryLock(500, TimeUnit.MILLISECONDS)) {
    try {
        RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);

        pq.setLocked(false);
        RebalancePushImpl.this.unlock(mq, true);
        return true;
    } finally {
        if (!forceUnlock) {
            pq.getConsumeLock().writeLock().unlock();
        }
    }
}

读写锁的语义就派上用场了:

  • 正在消费时,消费线程持有 readLock
  • Rebalance 想移除并解锁队列时,需要拿 writeLock
  • 写锁拿不到,就说明还有消费在跑,先不要急着移除。

所以更准确地说,RocketMQ 顺序消费这里有三层协调:

Broker 侧锁,保证一条队列不会被多个消费者实例同时持有;MessageQueueLock,保证同一个客户端里一条队列不会被多个线程同时消费;ProcessQueue.consumeLock,保证队列移除、解锁不会和正在执行的业务消费重叠。

四、ProcessQueue:消息按 offset 排队,成功才推进位点

那消息本身是怎么保持顺序的?

4.1 msgTreeMap:本地有序缓存

这里要看 ProcessQueue

ProcessQueue 里面用的是 TreeMap<Long, MessageExt>,key 是消息在队列里的 offset。

private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();

/**
 * A subset of msgTreeMap, will only be used when orderly consume
 */
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<>();

这个设计挺巧妙。

拉消息的时候,消息先进 msgTreeMap,天然按 offset 排序。真正消费时,takeMessages() 每次从 msgTreeMap 的最小 offset 开始取,也就是 pollFirstEntry()

public List<MessageExt> takeMessages(final int batchSize) {
    List<MessageExt> result = new ArrayList<>(batchSize);
    final long now = System.currentTimeMillis();
    try {
        this.treeMapLock.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!this.msgTreeMap.isEmpty()) {
                for (int i = 0; i < batchSize; i++) {
                    Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                    if (entry != null) {
                        result.add(entry.getValue());
                        consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
                    } else {
                        break;
                    }
                }
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("take Messages exception", e);
    }

    return result;
}

也就是说,消息不是随便拿的,而是从当前队列里 offset 最小的那条开始拿。

取出来以后,消息会临时放进 consumingMsgOrderlyTreeMap。这个临时区很重要,它相当于“正在消费但还没确认”的消息区。

4.2 commit:消费成功后推进 offset

消费成功后,commit() 会清掉这个临时区,并返回下一个应该提交的 offset。

public long commit() {
    try {
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            if (msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) {
                msgSize.set(0);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}

4.3 失败重试:失败消息不能被后面的消息越过

如果消费失败呢?

顺序消费最怕的不是失败,而是“失败后跳过去继续消费后面的消息”。一旦这么干,顺序就破了。

所以 RocketMQ 的处理方式很直接:失败就把这批消息放回去,当前队列暂停一会儿再消费。

case SUSPEND_CURRENT_QUEUE_A_MOMENT:
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
    if (checkReconsumeTimes(msgs)) {
        consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
        this.submitConsumeRequestLater(
            consumeRequest.getProcessQueue(),
            consumeRequest.getMessageQueue(),
            context.getSuspendCurrentQueueTimeMillis());
        continueConsume = false;
    } else {
        commitOffset = consumeRequest.getProcessQueue().commit();
    }
    break;

makeMessageToConsumeAgain() 做的事也很朴素:从正在消费的临时区移除,再按 offset 放回 msgTreeMap

public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
    try {
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            for (MessageExt msg : msgs) {
                this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
                this.msgTreeMap.put(msg.getQueueOffset(), msg);
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("makeMessageToCosumeAgain exception", e);
    }
}

这就解释了一个线上很常见的现象:

顺序消费里,如果某条消息一直失败,它后面的消息会被堵住。

这不是 bug,而是顺序消费的代价。你要顺序,就不能绕过失败消息继续往后跑。否则业务上看到的顺序就不可信了。

4.4 消费结果:SUCCESS 才真正提交

再看消费结果的处理,默认 autoCommit = true。正常返回 SUCCESS,RocketMQ 会提交 offset。

if (context.isAutoCommit()) {
    switch (status) {
        case COMMIT:
        case ROLLBACK:
            log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
                consumeRequest.getMessageQueue());
        case SUCCESS:
            commitOffset = consumeRequest.getProcessQueue().commit();
            this.getConsumerStatsManager().incConsumeOKTPS(
                consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
            break;
    }
}

最后再把提交位点更新到 OffsetStore

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

五、把整条链路串起来

这条链路串起来其实就很清楚了:

  • 生产端通过 MessageQueueSelector,把同一个业务 key 的消息发到同一个队列。
  • 消费端 Rebalance 后,在集群模式下向 Broker 抢队列锁。
  • 客户端内部再用 MessageQueueLock 保证一条队列只有一个线程消费。
  • 业务监听器执行期间持有 ProcessQueue.consumeLock 读锁,防止 Rebalance 移除、解锁队列和当前消费重叠。
  • ProcessQueueTreeMap 按 offset 缓存消息,每次从最小 offset 开始取。
  • 消费成功才提交 offset;消费失败就把消息放回队列,暂停当前队列,后面的消息不能越过它。

所以,RocketMQ 的顺序消费并不是靠某一个神奇开关完成的。它是生产端路由、Broker 队列锁、客户端本地队列锁、ProcessQueue 消费读写锁、offset 有序缓存、失败暂停重试这几块一起配合出来的结果。

最后留一句最容易踩坑的结论:

RocketMQ 保证的是同一个 MessageQueue 内的顺序,不是整个 Topic 的全局顺序。想让某一类业务有序,就必须先保证这类消息被稳定路由到同一个队列里。