很多人用 RocketMQ 消费消息,第一眼看到的代码都差不多:
consumer.registerMessageListener((msgs, context) -> {
// 处理业务逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
业务代码看起来很简单:注册一个 Listener,消息来了,RocketMQ 回调一下,处理成功就返回 CONSUME_SUCCESS。
但如果只停留在这个层面,很容易把 RocketMQ 的消费过程想得太简单。真正跑起来的时候,客户端背后做了不少事情:重平衡、拉取消息、本地缓存、消费线程池投递、消费结果处理、offset 更新、失败重试。
今天我们就顺着源码,把 RocketMQ 消费消息这条链路完整串一下。
消费不是一上来就拉消息,而是先分队列
RocketMQ 的消费是围绕 MessageQueue 展开的。
一个 Topic 会被拆成多个队列。一个消费组里可能有多个消费者实例,这些消费者不能都去消费同一个队列,否则消息就乱了。所以 RocketMQ 客户端启动以后,第一件重要的事就是做 Rebalance,也就是把 Topic 下的 MessageQueue 分给当前消费组里的不同客户端。
重平衡可以理解成一次“重新分工”。
比如一个 Topic 有 8 个 MessageQueue,消费组里原来只有 2 台机器,那大概就是一台机器消费 4 个队列。后来你扩容到 4 台机器,原来的分工就不合适了,RocketMQ 需要重新算一遍,让每台机器大概消费 2 个队列。
反过来也一样。如果某台消费者下线了,它原来负责的队列不能没人消费,其他还活着的消费者也要重新分配这些队列。
所以 Rebalance 主要发生在这些场景里:
- 消费者启动
- 消费者下线
- 消费组内实例数量变化
- Topic 的 MessageQueue 信息变化
- 订阅关系变化
- 客户端定时触发重平衡检查
在 RebalanceImpl.doRebalance() 里,RocketMQ 会遍历当前消费者订阅的 Topic,然后逐个 Topic 做重平衡:
public boolean doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.rebalanceByTopic(topic, isOrder);
}
}
}
这里还要注意一点:广播消费和集群消费的分配逻辑不一样。
广播消费很简单。既然要求组内每个消费者都收到一份消息,那每个消费者都要消费这个 Topic 下的全部队列。
集群消费就要真的“分队列”了。客户端会拿到 Topic 下所有 MessageQueue,再拿到当前消费组下所有消费者实例 cidAll,然后根据分配策略算出当前客户端应该负责哪些队列。
在 RebalanceImpl 里,可以看到它会根据分配策略算出当前客户端应该负责哪些队列:
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
这个 strategy 就是队列分配策略。默认常见的是平均分配,目的不是让每台机器都消费所有消息,而是让一个消费组内的多个消费者一起分摊队列。
分配结果算出来以后,还不能直接开始拉消息。RocketMQ 还要对比“新分配结果”和“当前本地正在处理的队列”。
如果某个队列已经不归当前消费者了,就要把它对应的 ProcessQueue 标记为 dropped,然后持久化 offset,最后从本地表里移除:
if (!mqSet.contains(mq)) {
pq.setDropped(true);
removeQueueMap.put(mq, pq);
}
这个 dropped 很关键。因为消费线程池里可能还有旧的消费任务正在跑,标记 dropped 后,后续这些任务就知道这个队列已经不应该继续被当前消费者处理了。
如果某个队列是新分配给当前消费者的,RocketMQ 才会为它创建新的 ProcessQueue,计算起始 offset,然后生成 PullRequest。
算完以后,RocketMQ 会为新分配到的队列创建 ProcessQueue,再生成 PullRequest:
ProcessQueue pq = createProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
如果是顺序消费,还会多一步:先尝试锁住这个 MessageQueue。只有锁成功了,当前消费者才会创建 ProcessQueue 并开始拉消息。
因为顺序消费要保证同一个队列同一时间只能被一个消费者处理。如果队列刚从 A 机器迁移到 B 机器,而 A 机器还没完全停下来,B 机器又马上开始消费,就可能破坏队列内顺序。所以顺序消费的 Rebalance 会更谨慎。
整个重平衡过程可以画成这样:
flowchart TD
A([触发 Rebalance]) --> B[遍历订阅的 Topic]
B --> C{消费模式}
C -- 广播消费 --> D[当前消费者拿到全部 MessageQueue]
C -- 集群消费 --> E[获取 Topic 全部 MessageQueue]
E --> F[获取消费组内全部 ConsumerId]
F --> G[按分配策略计算当前客户端队列]
D --> H[对比本地 ProcessQueueTable]
G --> H
H --> I{队列是否仍归当前消费者?}
I -- 不再属于当前消费者 --> J[标记 ProcessQueue dropped]
J --> K[持久化 offset 并移除本地队列]
I -- 新分配给当前消费者 --> L[创建 ProcessQueue]
L --> M[计算 nextOffset]
M --> N[生成 PullRequest]
N --> O[交给 PullMessageService 调度]
classDef start fill:#e8f3ff,stroke:#3b82f6,stroke-width:1.5px,color:#0f172a;
classDef decide fill:#fff7ed,stroke:#f97316,color:#0f172a;
classDef local fill:#f8fafc,stroke:#64748b,color:#0f172a;
classDef action fill:#ecfdf5,stroke:#22c55e,color:#0f172a;
class A start;
class C,I decide;
class B,D,E,F,G,H,J,K,L,M,N,O local;这一步非常关键。
MessageQueue 是 Broker 上的队列,ProcessQueue 是客户端本地维护的消费快照。后面拉到的消息、正在消费的消息、消费完成后 offset 推进到哪里,都要靠它来记录。
PullRequest 可以理解成一张“下一次拉消息的任务单”。它记录了当前消费组要从哪个 MessageQueue 拉、从哪个 nextOffset 开始拉,以及拉到消息后应该放进哪个 ProcessQueue。
后面 PullMessageService 会拿着这个 PullRequest 去调用 pullMessage()。拉到消息以后,RocketMQ 会更新它的 nextOffset,然后把同一个 PullRequest 再次丢回调度流程里,形成持续拉取:
case FOUND:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
processQueue.putMessage(pullResult.getMsgFoundList());
consumeMessageService.submitConsumeRequest(...);
executePullRequestImmediately(pullRequest);
break;
也就是说,PullRequest 不是一次性对象。它会随着消费进度不断更新 nextOffset,然后被反复调度,推动客户端持续从 Broker 拉消息。
简单说,Rebalance 解决的是一个问题:当前消费者到底应该消费哪些队列。
队列分好了,才轮到拉消息。
消息到底是推,还是拉?
这块确实容易绕,因为 RocketMQ 的名字和底层实现不是一个维度。
先把概念拆开。
如果是真正的 Push 模型,应该是 Broker 掌握主动权:Broker 发现有消息,就主动把消息推给 Consumer。Consumer 更像一个被动接收方。
如果是真正的 Pull 模型,主动权在 Consumer 手里:Consumer 觉得自己可以继续消费了,就主动向 Broker 要消息。Broker 只是响应这次请求。
RocketMQ 的 DefaultMQPushConsumer 容易让人误会,因为名字里有 Push。但源码里,Broker 并不会无缘无故主动把消息塞给消费者。真正发起请求的是客户端。
客户端里有一个专门的线程叫 PullMessageService,它会不断从本地队列里取出 PullRequest,然后交给对应消费者去 Broker 拉消息:
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
}
}
所以从网络通信方向看,RocketMQ 的 PushConsumer 仍然是 Consumer 主动请求 Broker。
那为什么它又叫 PushConsumer?
因为它把“拉消息”这件事藏在客户端内部了。业务代码不用自己写循环,也不用自己管理每次从哪个 offset 拉。你只需要注册一个 MessageListener,消息拉回来以后,RocketMQ 会自动回调你的业务逻辑。
也就是说,RocketMQ 的 PushConsumer 可以这样理解:
- 底层通信:Consumer 主动 Pull
- 使用体验:框架自动回调 Listener,看起来像 Push
所以它不是“Broker 真推”,而是“客户端自动拉,拉到以后再推给业务 Listener”。
还有一个细节也很重要:RocketMQ 不是那种简单粗暴的短轮询。
短轮询是 Consumer 问 Broker:“有消息吗?”如果没有,Broker 立刻返回空结果。Consumer 过一会儿再问一次。这种方式会产生很多空请求。
RocketMQ 的 PushConsumer 更接近长轮询 Pull。客户端发起拉取请求,如果 Broker 暂时没有新消息,请求可以在 Broker 端挂起一小段时间;这段时间里如果有新消息到了,Broker 就尽快返回;如果超时还没有,再返回没有新消息。
所以它的感觉会更像推送:消息一到,Consumer 很快就被回调。
但底层主动权还是在 Consumer,因为是 Consumer 先发起了这次拉取请求。
可以用一个生活里的例子理解。
普通 Pull 像你每隔 5 分钟去前台问一次:“我的快递到了吗?”
真正 Push 像快递员主动敲门:“你的快递到了。”
RocketMQ 的 PushConsumer 更像你提前跟前台说:“我在这等一会儿,如果快递到了马上告诉我;如果一段时间还没到,你也告诉我一声。”然后你再继续发起下一次等待。
这就是为什么它用起来像 Push,但源码里是 Pull。
再看消费循环就更清楚了。
当一次拉取有消息时,RocketMQ 会把消息放进 ProcessQueue,提交给消费线程池,然后马上安排下一次拉取;如果没有新消息,也会继续安排下一次拉取:
case FOUND:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
processQueue.putMessage(pullResult.getMsgFoundList());
consumeMessageService.submitConsumeRequest(...);
executePullRequestImmediately(pullRequest);
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
executePullRequestImmediately(pullRequest);
break;
所以 PushConsumer 的核心不是 Broker 推,而是客户端维护了一条持续运转的拉取链路。
一句话说清楚:RocketMQ 是 Pull 内核,Push 使用体验。Consumer 主动从 Broker 拉消息,拉到以后由客户端框架“推”给业务 Listener。
这不是文字游戏,而是一个很重要的设计点。因为拉取节奏掌握在消费者手里,客户端可以根据自己的消费能力做流控,不至于被 Broker 一股脑推爆。
拉消息之前,RocketMQ 会先看消费者扛不扛得住
DefaultMQPushConsumerImpl.pullMessage() 并不是拿到 PullRequest 就直接请求 Broker。
它会先检查当前消费状态,比如消费者是否暂停、本地缓存是否过多、顺序消费时队列锁是否拿到。
其中最典型的是本地缓存流控:
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);
return;
}
这段逻辑很好理解:如果当前 ProcessQueue 里堆了太多消息,就先别继续拉了,延迟一会儿再试。
RocketMQ 还会判断缓存消息大小、消息 offset 跨度等指标。目的都是一样的:别只顾着从 Broker 拉,得看看消费者本地有没有能力消化。
很多线上消息堆积问题,其实就卡在这里。不是 Broker 没消息,也不是消费者没启动,而是客户端发现自己本地已经堆太多了,于是主动放慢拉取节奏。
消息拉回来后,先进入 ProcessQueue
真正请求 Broker 的地方,是 pullAPIWrapper.pullKernelImpl(...)。
这一步是异步拉取。Broker 返回结果以后,会进入 PullCallback.onSuccess(),如果状态是 FOUND,说明这次真的拉到了消息。
接着 RocketMQ 会做两件事。
第一,把消息放进 ProcessQueue:
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
第二,把消息提交给消费服务:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
这里的 ProcessQueue 很值得多看一眼。
它内部用 TreeMap<Long, MessageExt> 保存消息,key 是消息在队列里的 offset:
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
}
}
为什么要用 offset 做 key?
因为消费完成以后,RocketMQ 需要知道这个队列现在最小的未消费 offset 是多少。只有这样,才能正确更新消费进度。
换句话说,ProcessQueue 不只是一个临时缓存,它还承担了消费进度计算的职责。
并发消费:消息被丢进线程池,再回调业务 Listener
如果是普通并发消费,处理类是 ConsumeMessageConcurrentlyService。
它收到一批消息后,会按照 consumeMessageBatchMaxSize 拆批,然后封装成 ConsumeRequest 提交到消费线程池:
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
真正调用业务代码的地方,在 ConsumeRequest.run() 里:
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
这就是我们平时写的 MessageListenerConcurrently 被调用的位置。
这里有个细节:传给业务 Listener 的消息列表是 Collections.unmodifiableList(msgs)。也就是说,RocketMQ 不希望业务代码随便改这批消息列表本身。
用户 Listener 返回的结果,一般就是两个:
CONSUME_SUCCESS:消费成功
RECONSUME_LATER:稍后重试
如果业务代码抛异常,或者返回 null,RocketMQ 会把它当成消费失败处理。
两种消费模式:集群消费和广播消费
说到消费结果处理之前,得先把 RocketMQ 的两种消费模式讲清楚:CLUSTERING 和 BROADCASTING。
这两个模式解决的是同一个问题:一个消费组里有多个消费者时,一条消息到底应该被谁消费。
先看默认也是最常用的集群消费。
集群消费的语义是:同一个消费组内,一条消息只会被其中一个消费者实例消费。
比如一个消费组 order-consumer-group 里有 3 台机器,Topic 下有多条 MessageQueue。RocketMQ 会通过 Rebalance 把这些队列分给不同机器。某条消息落在某个队列里,那么它最终只会被负责这个队列的那台消费者处理。
用大白话说就是:大家组成一个小组,一起分摊这批消息。
这种模式最适合普通业务消费,比如订单创建、库存扣减、发送短信、积分发放。因为这些业务通常只希望一条消息被处理一次,不希望同一个消费组里的每台机器都执行一遍。
集群消费还有一个特点:失败消息会进入重试流程。
如果业务 Listener 返回 RECONSUME_LATER,或者消费过程中抛异常,RocketMQ 会把失败消息送回 Broker,后续再投递给这个消费组。超过最大重试次数以后,消息会进入死信队列。
广播消费就不一样了。
广播消费的语义是:同一个消费组内,每个消费者实例都会收到这条消息。
还是刚才那个例子,如果 order-consumer-group 里有 3 台机器,那么同一条消息会被这 3 台机器各消费一次。
用大白话说就是:不是分工干活,而是组内每个人都通知一遍。
这种模式更适合通知类、刷新类场景,比如本地缓存刷新、配置变更通知、某些需要每个节点都感知的事件。
但广播消费有个很重要的限制:消费失败不会像集群消费那样走 Broker 重试。
在并发消费结果处理里,RocketMQ 对广播模式的处理很直接:
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
也就是说,广播模式下某个消费者处理失败了,这条消息对这个消费者来说基本就丢了,不会被 Broker 重新投递。
为什么要这么设计?
因为广播消费的目标是“每个客户端都收到一份”,消费进度也更偏客户端本地语义。如果失败后还统一走 Broker 重试,很容易让广播语义变复杂:到底给哪台机器重试、重试几次、某台机器下线了怎么办,都会变得麻烦。
所以选消费模式的时候,可以记住这个判断:
再强调一下,RocketMQ 默认是集群消费。线上业务里如果没有非常明确的“每个实例都必须收到”的需求,一般不要轻易改成广播消费。
消费成功以后,RocketMQ 提交的是 offset
并发消费完成后,会进入 processConsumeResult()。
如果消费成功,RocketMQ 会统计成功数量;如果消费失败,会统计失败数量,并根据消费模式决定后续动作。
广播模式下,消费失败会直接丢弃:
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
集群模式下,失败消息会被送回 Broker,等待后续重试:
case CLUSTERING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
break;
处理完成功和失败的消息以后,RocketMQ 会从 ProcessQueue 里移除已经处理过的消息,然后更新 offset:
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
这里要注意一个点:RocketMQ 提交的不是“某条消息成功了”,而是“这个队列消费到哪个 offset 了”。
这也是为什么消费逻辑里经常会强调幂等。只要 offset 还没稳定提交,或者消费结果处理过程中发生异常,消息就有可能被再次投递。
RocketMQ 保证的是至少一次投递,不保证业务天然只执行一次。幂等要由业务自己兜住。
消费失败以后,消息怎么重试?
并发消费失败时,集群模式下会调用 sendMessageBack()。
从业务视角看,就是这条消息没有消费成功,后面还会再来。
从 Broker 视角看,失败消息会进入重试流程。RocketMQ 会根据消费组构造重试 Topic,后续再把消息投递给这个消费组。
如果发送回 Broker 也失败了,RocketMQ 不会马上放弃,而是把这批消息重新提交到本地消费线程池,延迟 5 秒后再消费:
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
所以消费失败并不是简单地“返回失败就结束”。RocketMQ 背后会尽量把消息重新送回重试链路。
但重试也不是无限温柔的。超过最大重试次数以后,消息会进入死信队列,留给人工排查或补偿处理。
顺序消费:重点不是线程池,而是队列锁
并发消费追求吞吐,顺序消费追求同一个队列内的消息顺序。
RocketMQ 的顺序消费不是全局有序,而是 MessageQueue 级别有序。也就是说,同一个队列里的消息按顺序消费,不同队列之间仍然可以并行。
在 ConsumeMessageOrderlyService 里,消费前会先拿到当前 MessageQueue 对应的锁对象:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
// takeMessages and consume
}
}
集群顺序消费还要依赖 Broker 端队列锁。只有当前消费者拿到了这个队列的锁,才允许继续消费。
这就解释了一个常见问题:为什么顺序消费吞吐会比并发消费低?
因为它用锁和队列粒度限制换来了顺序性。同一个 MessageQueue 不能随便并发消费,否则顺序就没了。
所以如果业务要求“同一个订单的消息必须有序”,生产端就要把同一个订单号路由到同一个 MessageQueue。RocketMQ 保证的是队列内有序,不是整个 Topic 全局有序。
消费链路串起来,其实就这一条线
看到这里,RocketMQ 消费消息的整体链路就比较清楚了:
flowchart TD
A([Consumer 启动]) --> B[Rebalance 分配 MessageQueue]
B --> C[创建 ProcessQueue]
C --> D[生成 PullRequest]
D --> E[PullMessageService 调度拉取请求]
E --> F[DefaultMQPushConsumerImpl 异步拉 Broker]
F --> G{Broker 是否返回消息?}
G -- 有消息 --> H[写入本地 ProcessQueue]
H --> I[ConsumeMessageService 提交消费任务]
I --> J[消费线程池执行 ConsumeRequest]
J --> K[回调业务 MessageListener]
K --> L{消费结果}
L -- 成功 --> M[从 ProcessQueue 移除已消费消息]
L -- 失败 --> N[sendMessageBack 进入重试链路]
N --> M
M --> O[OffsetStore 更新消费 offset]
G -- 暂无消息 --> P[更新 nextOffset]
P --> Q[再次提交 PullRequest]
O --> Q
Q --> E
classDef start fill:#e8f3ff,stroke:#3b82f6,stroke-width:1.5px,color:#0f172a;
classDef local fill:#f8fafc,stroke:#64748b,color:#0f172a;
classDef broker fill:#fff7ed,stroke:#f97316,color:#0f172a;
classDef consume fill:#ecfdf5,stroke:#22c55e,color:#0f172a;
classDef result fill:#fef2f2,stroke:#ef4444,color:#0f172a;
class A start;
class B,C,D,E,H,M,O,P,Q local;
class F,G broker;
class I,J,K consume;
class L,N result;这张图里有两个循环。
第一个循环是拉取循环:一次拉完以后,不管有没有消息,PullRequest 都会继续被调度起来,只是中间可能因为流控、暂停、异常等原因延迟一会儿。
第二个循环是失败重试:业务消费失败后,集群模式下消息会被送回 Broker,后面再通过重试 Topic 投递回来。
这条链路里有几个特别重要的关键词:
- Rebalance:决定当前消费者消费哪些队列。
- PullMessageService:负责调度拉消息。
- ProcessQueue:客户端本地消费快照和消息缓存。
- ConsumeMessageService:负责把消息投递给消费线程池。
- OffsetStore:记录消费进度。
- Retry Topic / DLQ:处理失败重试和死信消息。
最后总结一下
RocketMQ 消费消息不是简单地“Broker 发消息,Consumer 收消息”。
它更像是一套客户端驱动的消费流水线:先通过 Rebalance 拿到自己负责的队列,再主动从 Broker 拉消息,拉回来先进入本地 ProcessQueue,然后交给消费线程池回调业务 Listener,最后根据消费结果更新 offset 或进入重试。
理解这条链路以后,很多问题就能顺着源码定位了。
消息堆积时,先看是 Broker 端堆积,还是客户端 ProcessQueue 堆积;消费慢时,看消费线程池和业务耗时;重复消费时,看 offset 提交和业务幂等;顺序消费卡住时,看队列锁和单队列消费耗时。
RocketMQ 消费的核心,不是一个 Listener,而是 Listener 背后的整套拉取、缓存、投递、确认和重试机制。