一、先搞清楚:顺序消费到底顺序在哪里
很多人一提 RocketMQ 顺序消费,第一反应是“消息按顺序消费”。这句话没错,但不够准确。
更准确地说:RocketMQ 的顺序消费,是以 MessageQueue 为单位保证顺序。只要同一类业务消息被发到同一个队列里,消费端就会尽量保证这条队列同一时间只被一个线程、一个消费者实例顺序处理。
1.1 它不是全局有序,而是局部有序
那什么叫“顺序”?
别把它想复杂了。顺序消费解决的是这样一类问题:同一个业务对象的多条消息,消费时不能乱。
比如一个订单,正常状态流转可能是:
创建订单 -> 支付成功 -> 商家发货 -> 用户确认收货 -> 订单完成
如果消费端先处理了“订单完成”,后处理“支付成功”,业务状态肯定就乱了。这里我们真正关心的不是所有订单之间的全局顺序,而是同一个订单自己的消息顺序。
所以 RocketMQ 的顺序消费更像是“局部有序”:同一个业务 key 的消息有序,不同业务 key 之间可以并行。
1.2 哪些场景适合用顺序消费
这也是为什么顺序消费经常出现在这些场景里:
- 订单状态流转,比如下单、支付、发货、确认收货。
- 交易流水处理,比如同一个账户的扣款、入账、冲正。
- 库存变更,比如同一个商品 SKU 的扣减、回滚、补偿。
- 数据同步,比如数据库 binlog、缓存更新、搜索索引更新。
- 设备事件上报,比如同一台设备的上线、告警、恢复、离线。
- 工作流推进,比如审批流、任务状态机、业务编排事件。
这些场景有个共同点:它们通常不是要求整个 Topic 全局有序,而是要求同一个业务实体内部有序。
1.3 顺序消费不要滥用
当然,顺序消费不是免费的。它为了保证顺序,会牺牲一部分并发能力。一条队列同一时间只能被一个线程消费,如果某条消息一直失败,它后面的消息也会被堵住。所以不是所有消息都应该上顺序消费。
像日志采集、埋点上报、通知推送、普通异步解耦这类场景,如果业务不关心严格先后顺序,用并发消费通常更合适,吞吐也更高。
说白了,顺序消费要解决的是“乱序会不会让业务出错”。如果会,就用;如果只是看起来有顺序更舒服,那大概率没必要。
二、生产端:先把同一业务 key 发到同一队列
生产端这边其实很简单,RocketMQ 把选择队列的能力交给了用户:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
比如订单消息,一般会用 orderId 对队列数量取模。这样同一个订单的创建、支付、发货、完成,就会落到同一个 MessageQueue 里。
但这只是第一步。
三、消费端:三层锁把队列消费稳住
真正有意思的地方在消费端:RocketMQ 怎么保证这一条队列不会被多个线程乱序消费?
3.1 顺序消费入口:MessageListenerOrderly
入口在 DefaultMQPushConsumerImpl。只要用户注册的是 MessageListenerOrderly,客户端就会创建 ConsumeMessageOrderlyService。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
这个类名已经把事情说得很直白了:顺序消费的核心逻辑,就在 ConsumeMessageOrderlyService 里。
3.2 第一层锁:Broker 侧队列锁
它启动时会做一件很关键的事:如果是集群消费模式,会定时去 Broker 抢队列锁。
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
为什么要抢锁?
因为在集群消费下,同一个 Consumer Group 里可能有多个消费者实例。Rebalance 之后,某条 MessageQueue 理论上只会分配给一个消费者,但网络抖动、重平衡切换、客户端异常退出这些情况都可能出现。RocketMQ 不能只靠“分配关系”来赌顺序,所以它又加了一层 Broker 侧队列锁。
RebalanceImpl.lockAll() 会把当前客户端持有的队列按 Broker 分组,然后向 Broker 发起批量加锁。
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mq : mqs) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (lockOKMQSet.contains(mq)) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
} else {
processQueue.setLocked(false);
}
}
}
看到这里,顺序消费的第一层保护就出来了:
集群模式下,先确保某条 MessageQueue 在同一时刻只被一个消费者实例持有。
3.3 第二层锁:客户端本地 MessageQueueLock
但这还不够。
一个消费者实例内部也有消费线程池。如果同一条队列里的消息被多个线程同时拿走,还是会乱。所以 RocketMQ 又做了第二层保护:本地队列锁。
/**
* Message lock,strictly ensure the single queue only one thread at a time consuming
*/
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, ConcurrentMap<Integer, Object>> mqLockTable =
new ConcurrentHashMap<>(32);
}
真正执行消费任务时,ConsumeRequest.run() 会先根据 MessageQueue 拿到一个本地锁对象,然后 synchronized 住。
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
// consume messages
}
}
这段代码很关键。
广播模式下,每个消费者都会消费一份消息,所以不需要 Broker 侧队列锁;但在本地仍然要保证一个队列一次只被一个线程消费。
集群模式下,必须满足两个条件:
processQueue.isLocked():这条队列已经被当前消费者实例锁住。!processQueue.isLockExpired():锁还没过期。
如果队列没锁住,或者锁过期了,RocketMQ 不会硬着头皮消费,而是稍后重新尝试加锁。
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
看到这里还没完。
3.4 第三层锁:ProcessQueue.consumeLock
如果继续往下看源码,会发现消费真正调用业务监听器之前,还会拿一把 ProcessQueue 上的读锁:
try {
this.processQueue.getConsumeLock().readLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} finally {
this.processQueue.getConsumeLock().readLock().unlock();
}
这把锁定义在 ProcessQueue 里:
private final ReadWriteLock consumeLock = new ReentrantReadWriteLock();
刚开始看这里,很容易以为“顺序消费原来还有第三把消费锁”。这么理解也没问题,但要说清它到底在防什么。
前面的 MessageQueueLock 已经能保证同一个客户端里,一条 MessageQueue 不会被多个消费线程同时执行。那 ProcessQueue.consumeLock 为什么还要存在?
答案在 Rebalance 移除队列的时候。
当某个 MessageQueue 不再属于当前消费者,RocketMQ 不能直接把它解锁、移除 offset,然后让别的消费者马上接手。因为当前消费者可能正在执行用户的 consumeMessage()。如果这时候强行释放队列,另一个消费者拿到锁后开始消费同一条队列,就有机会和老消费者重叠,顺序语义就危险了。
所以移除顺序消费队列时,RebalancePushImpl 会尝试拿 consumeLock 的写锁:
if (forceUnlock || pq.getConsumeLock().writeLock().tryLock(500, TimeUnit.MILLISECONDS)) {
try {
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
pq.setLocked(false);
RebalancePushImpl.this.unlock(mq, true);
return true;
} finally {
if (!forceUnlock) {
pq.getConsumeLock().writeLock().unlock();
}
}
}
读写锁的语义就派上用场了:
- 正在消费时,消费线程持有
readLock。 - Rebalance 想移除并解锁队列时,需要拿
writeLock。 - 写锁拿不到,就说明还有消费在跑,先不要急着移除。
所以更准确地说,RocketMQ 顺序消费这里有三层协调:
Broker 侧锁,保证一条队列不会被多个消费者实例同时持有;MessageQueueLock,保证同一个客户端里一条队列不会被多个线程同时消费;ProcessQueue.consumeLock,保证队列移除、解锁不会和正在执行的业务消费重叠。
四、ProcessQueue:消息按 offset 排队,成功才推进位点
那消息本身是怎么保持顺序的?
4.1 msgTreeMap:本地有序缓存
这里要看 ProcessQueue。
ProcessQueue 里面用的是 TreeMap<Long, MessageExt>,key 是消息在队列里的 offset。
private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<>();
这个设计挺巧妙。
拉消息的时候,消息先进 msgTreeMap,天然按 offset 排序。真正消费时,takeMessages() 每次从 msgTreeMap 的最小 offset 开始取,也就是 pollFirstEntry()。
public List<MessageExt> takeMessages(final int batchSize) {
List<MessageExt> result = new ArrayList<>(batchSize);
final long now = System.currentTimeMillis();
try {
this.treeMapLock.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
也就是说,消息不是随便拿的,而是从当前队列里 offset 最小的那条开始拿。
取出来以后,消息会临时放进 consumingMsgOrderlyTreeMap。这个临时区很重要,它相当于“正在消费但还没确认”的消息区。
4.2 commit:消费成功后推进 offset
消费成功后,commit() 会清掉这个临时区,并返回下一个应该提交的 offset。
public long commit() {
try {
this.treeMapLock.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
if (msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) {
msgSize.set(0);
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
4.3 失败重试:失败消息不能被后面的消息越过
如果消费失败呢?
顺序消费最怕的不是失败,而是“失败后跳过去继续消费后面的消息”。一旦这么干,顺序就破了。
所以 RocketMQ 的处理方式很直接:失败就把这批消息放回去,当前队列暂停一会儿再消费。
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
makeMessageToConsumeAgain() 做的事也很朴素:从正在消费的临时区移除,再按 offset 放回 msgTreeMap。
public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
try {
this.treeMapLock.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}
这就解释了一个线上很常见的现象:
顺序消费里,如果某条消息一直失败,它后面的消息会被堵住。
这不是 bug,而是顺序消费的代价。你要顺序,就不能绕过失败消息继续往后跑。否则业务上看到的顺序就不可信了。
4.4 消费结果:SUCCESS 才真正提交
再看消费结果的处理,默认 autoCommit = true。正常返回 SUCCESS,RocketMQ 会提交 offset。
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(
consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
}
}
最后再把提交位点更新到 OffsetStore。
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
五、把整条链路串起来
这条链路串起来其实就很清楚了:
- 生产端通过
MessageQueueSelector,把同一个业务 key 的消息发到同一个队列。 - 消费端 Rebalance 后,在集群模式下向 Broker 抢队列锁。
- 客户端内部再用
MessageQueueLock保证一条队列只有一个线程消费。 - 业务监听器执行期间持有
ProcessQueue.consumeLock读锁,防止 Rebalance 移除、解锁队列和当前消费重叠。 ProcessQueue用TreeMap按 offset 缓存消息,每次从最小 offset 开始取。- 消费成功才提交 offset;消费失败就把消息放回队列,暂停当前队列,后面的消息不能越过它。
所以,RocketMQ 的顺序消费并不是靠某一个神奇开关完成的。它是生产端路由、Broker 队列锁、客户端本地队列锁、ProcessQueue 消费读写锁、offset 有序缓存、失败暂停重试这几块一起配合出来的结果。
最后留一句最容易踩坑的结论:
RocketMQ 保证的是同一个 MessageQueue 内的顺序,不是整个 Topic 的全局顺序。想让某一类业务有序,就必须先保证这类消息被稳定路由到同一个队列里。