上一篇从设计角度讲了 RocketMQ 存储的三个核心结论:全局顺序写、PageCache 押注、读写分离后台化。这篇我们把同一套逻辑放到源码里重走一遍,看看那些设计决策落地时到底长什么样。
写入主路径:一把锁守住顺序写
消息从 SendMessageProcessor 进来的第一个核心方法是 CommitLog.asyncPutMessage()。走进去之前,先做几件不起眼但很重要的事:设置存储时间戳、算 CRC32、按 Topic 长度决定消息版本(V1 还是 V2)。这些都在拿锁之前完成,原因很简单:拿锁之后的临界区越短越好,任何能提前做的事都不要拿到锁里去做。
// 锁外:序列化消息,拿到编码后的 ByteBuffer
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 拿锁:串行化写入,保证顺序追加
putMessageLock.lock();
try {
if (null == mappedFile || mappedFile.isFull()) {
// 文件满了则创建新文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
} finally {
putMessageLock.unlock();
}
这个 putMessageLock 有三种实现,选哪种由两个配置参数决定,是个嵌套三元:
PutMessageLock adaptiveBackOffSpinLock = new AdaptiveBackOffSpinLockImpl();
this.putMessageLock = messageStore.getMessageStoreConfig().getUseABSLock() ? adaptiveBackOffSpinLock :
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
第一种:PutMessageSpinLock,最原始的自旋锁,纯 CAS 死等:
public void lock() {
boolean flag;
do {
flag = this.putMessageSpinLock.compareAndSet(true, false);
} while (!flag);
}
没有任何退让,拿不到锁就一直 CAS。低竞争时延迟最低,高竞争时 CPU 会被白白烧掉。
第二种:PutMessageReentrantLock,就是包了一层 ReentrantLock,拿不到锁的线程会被挂起,等持锁线程释放再唤醒。CPU 不浪费,但有线程切换的代价,延迟比自旋高一点。
第三种:AdaptiveBackOffSpinLockImpl,这是 RocketMQ 后来加的自适应退避锁,也是最有意思的一个。它内部同时持有 BackOffSpinLock 和 BackOffReentrantLock 两个锁实例,根据实时 TPS 和竞争强度在两者之间动态切换。
先看 BackOffSpinLock 的 lock() 怎么写的:
public void lock() {
int spinDegree = this.optimalDegree; // 初始值 1000
while (true) {
for (int i = 0; i < spinDegree; i++) {
if (this.putMessageSpinLock.compareAndSet(true, false)) {
return; // 拿到了,直接返回
}
}
// 自旋 spinDegree 次还没拿到,记一次"退避",主动 yield 一下
numberOfRetreat.get(LocalTime.now().getSecond() % 2).getAndIncrement();
Thread.sleep(0); // 让出 CPU,允许调度器切换
}
}
它不是无限 CAS,而是每轮自旋 optimalDegree 次,拿不到就记一次"退避(retreat)",调用 Thread.sleep(0) 让出 CPU,再重新进入自旋。Thread.sleep(0) 不等同于空转,它会触发一次调度器检查,当前线程有机会被换出,给其他线程运行机会。
而 optimalDegree 是动态调整的——这就是"自适应"的来源。swap() 方法在每次 unlock() 时被触发,用时间槽统计上一秒的 TPS 和退避次数:
// 退避率过高 → 竞争激烈,自旋白耗 CPU
if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO >= tps) {
if (lock.isAdapt()) {
lock.adapt(true); // 先把 optimalDegree 翻倍(上限 10000)
} else {
needSwap = true; // 已到上限,直接切换到 ReentrantLock
}
}
// 退避率很低 → 竞争不激烈,缩小自旋轮次节省 CPU
else if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO * SPIN_LOCK_ADAPTIVE_RATIO <= tps) {
lock.adapt(false); // optimalDegree 减去 1000
}
切换到 BackOffReentrantLock 后,如果后续负载降回来,会再切回 BackOffSpinLock。切换时用 state 原子布尔量做门控,等 currentThreadNum 归零(没有线程在临界区)才真正完成替换,保证新旧锁不会混用。
时间槽统计用的是 LocalTime.now().getSecond() % 2——每秒在两个槽之间交替,读"上一秒"的槽,写"这一秒"的槽,天然错开,不需要额外锁。
三种锁怎么选:低流量、竞争少,PutMessageSpinLock 延迟最低;流量高、竞争持续激烈,PutMessageReentrantLock 更省 CPU;流量高低峰变化明显,AdaptiveBackOffSpinLockImpl 自动适配,是三种里最"聪明"也最复杂的。
拿到锁之后整体只做这么几件事:找到当前可写的 MappedFile,调用 appendMessage()。MappedFile 满了就新建一个,再重试一次。
进到 appendMessagesInner() 里,核心是这几行:
int currentPos = WROTE_POSITION_UPDATER.get(this);
ByteBuffer byteBuffer = appendMessageBuffer().slice();
byteBuffer.position(currentPos);
// 将把消息序列化写入 byteBuffer
AppendMessageResult result = cb.doAppend(...);
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
WROTE_POSITION_UPDATER 是 AtomicIntegerFieldUpdater,比直接用 AtomicInteger 少一层对象包装,内存更紧凑。
这里 appendMessageBuffer() 的实现重点只有一行:
protected ByteBuffer appendMessageBuffer() {
return writeBuffer != null ? writeBuffer : this.mappedByteBuffer;
}
这里藏着 TransientStorePool 的切换点:开启堆外内存池时写 writeBuffer,否则直接写 mmap 的 mappedByteBuffer。具体怎么回事,后面讲。
mmap 初始化:文件映射就这几行
很多人以为 mmap 很复杂,看源码才发现核心就三行:
// DefaultMappedFile.init()
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
fileChannel.map() 把磁盘文件映射到虚拟内存,之后对 mappedByteBuffer 的所有读写操作都直接操作这段虚拟地址,由 OS 负责把 PageCache 里的脏页回写到磁盘。应用层感知不到磁盘,写起来像写内存。
但这里要注意的是:文件刚映射时,物理页还没建立。第一次访问会触发缺页中断,OS 才去把磁盘数据加载进内存。在高吞吐写入下,如果每次新文件都在第一次写时触发缺页,会带来不可预期的延迟毛刺。
RocketMQ 的解法是文件预热。
文件预热:把缺页中断提前消化掉
新建 MappedFile 之后,AllocateMappedFileService 会调用 warmMappedFile():
public void warmMappedFile(FlushDiskType type, int pages) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
for (long i = 0; i < this.fileSize; i += DefaultMappedFile.OS_PAGE_SIZE) {
byteBuffer.put((int) i, (byte) 0); // 每隔 4KB 写一个字节,触发缺页
}
mappedByteBuffer.force(); // 同步刷盘场景下,顺便刷一次
this.mlock(); // 锁内存 + madvise WILLNEED
}
逐页写一个零字节,目的只有一个:把这 1GB 文件对应的所有物理页提前 fault 进内存,后续正式写入时不会再触发缺页。
mlock() 里做了两件事:
LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); // 告诉 OS 不要把这些页换出
LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED); // 提示 OS 这块内存接下来会用到
mlock 防止 OS 在内存紧张时把已经 fault 进来的页换出去;madvise(WILLNEED) 是提前打个招呼。两个一起用,让后续写入路径尽量保持干净、无停顿。
两条刷盘路径:谁来等磁盘
消息写进内存(PageCache 或堆外 buffer)之后,handleDiskFlush() 决定怎么对待这条消息。
异步刷盘(FlushRealTimeService):写线程写完直接返回,不等磁盘。后台有一个 FlushRealTimeService 线程在跑:
// FlushRealTimeService.run()
while (!this.isStopped()) {
this.waitForRunning(interval); // 默认等 500ms
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
}
每隔 500ms(或攒够一定脏页数)批量 flush(),flush() 最终调的是 mappedByteBuffer.force() 或 fileChannel.force(false)。写线程完全不感知。
同步刷盘(GroupCommitService):写线程写完后,构造一个 GroupCommitRequest 扔给 GroupCommitService,然后等它的 CompletableFuture:
GroupCommitRequest request = new GroupCommitRequest(
result.getWroteOffset() + result.getWroteBytes(),
syncFlushTimeout
);
service.putRequest(request);
// 写线程阻塞在这里,直到 flushedPosition 推进到覆盖当前消息
PutMessageStatus status = request.future().get(syncFlushTimeout, TimeUnit.MILLISECONDS);
GroupCommitService 里用了一个经典的双 List 设计——requestsWrite 和 requestsRead。写线程往 requestsWrite 加请求,刷盘线程醒来时做一次 swapRequests() 把两个 list 对调,然后处理 requestsRead 里的请求,批量 flush。这样写线程和刷盘线程操作的不是同一个 list,减少锁竞争。
private void doCommit() {
for (GroupCommitRequest req : this.requestsRead) {
boolean flushOK = mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 1000 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
req.wakeupCustomer(flushOK ? PUT_OK : FLUSH_DISK_TIMEOUT);
}
}
wakeupCustomer() 内部调 flushOKFuture.complete(status),写线程阻塞的 future().get() 随即返回。
TransientStorePool:堆外内存池的真实面目
TransientStorePool 的 init() 很直接:
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
LibC.INSTANCE.mlock(new Pointer(address), new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
提前分配 poolSize 个 Direct ByteBuffer(堆外内存),每个大小等于 CommitLog 文件大小(1GB)。关键点是 mlock:把这块内存锁住,OS 不会把它换出。消息写进来先序列化到这里。
新建 MappedFile 时,如果 TransientStorePool 可用,就从池里借一个 buffer:
// DefaultMappedFile.init() 的 TransientStorePool 版本
this.writeBuffer = transientStorePool.borrowBuffer();
此时 appendMessageBuffer() 返回的是 writeBuffer 而不是 mappedByteBuffer,消息先落在堆外内存。
后台的 CommitRealTimeService 负责把堆外数据提交到 FileChannel:
// CommitRealTimeService.run()
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
if (!result) {
CommitLog.this.flushManager.wakeUpFlush(); // 提交了新数据,通知刷盘线程
}
commit() 最终调 DefaultMappedFile.commit0():
protected void commit0() {
int writePos = WROTE_POSITION_UPDATER.get(this);
int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);
if (writePos - lastCommittedPosition > 0) {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer); // 从堆外内存写入 FileChannel(即 PageCache)
COMMITTED_POSITION_UPDATER.set(this, writePos);
}
}
链路就是:写入堆外 writeBuffer → CommitRealTimeService 批量 commit 到 FileChannel → FlushRealTimeService 把 FileChannel 对应的 PageCache 刷到磁盘。
ConsumeQueue 怎么建出来的
写完 CommitLog,ConsumeQueue 是怎么跟上的?答案是 ReputMessageService,一个独立的后台线程,从 CommitLog 的某个 offset 开始追:
// ReputMessageService.doReput()
for (boolean doNext = true; isCommitLogAvailable && doNext; ) {
SelectMappedBufferResult result = commitLog.getData(reputFromOffset);
DispatchRequest dispatchRequest =
commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);
DefaultMessageStore.this.doDispatch(dispatchRequest);
this.reputFromOffset += size;
}
doDispatch() 会把 DispatchRequest 分发给所有注册的 CommitLogDispatcher,其中一个就负责构建 ConsumeQueue,最终调到 ConsumeQueue.putMessagePositionInfoWrapper():
private boolean putMessagePositionInfo(long offset, int size, long tagsCode, long cqOffset) {
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); // 20 字节
this.byteBufferIndex.putLong(offset); // 8B:CommitLog 物理偏移量
this.byteBufferIndex.putInt(size); // 4B:消息大小
this.byteBufferIndex.putLong(tagsCode); // 8B:Tag 哈希值
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
这 20 字节,就是 CQ_STORE_UNIT_SIZE = 20,源码里也有注释画出了字段布局:
┌─────────────────────────────┬───────────────────┬─────────────────────────────┐
│ CommitLog Physical Offset │ Body Size │ Tag HashCode │
│ (8 Bytes) │ (4 Bytes) │ (8 Bytes) │
└─────────────────────────────┴───────────────────┴─────────────────────────────┘
消费进度 N 对应的索引在哪?N × 20 算出字节偏移量,直接定位文件和文件内位置,O(1),没有任何遍历。
reputFromOffset 是 ReputMessageService 的内部游标,它代表"索引已经追到哪"。CommitLog 的 wrotePosition 往前跑,ReputMessageService 跟着追,两者之间的差就是"消费者当前看不到的消息范围"。这个差值通常极小,但确实不是零——这是设计上主动接受的异步性。
IndexFile 的哈希槽 + 链表:看 putKey 就够了
IndexFile.putKey() 把一条消息 key 写进索引,完整流程:
public boolean putKey(String key, long phyOffset, long storeTimestamp) {
int keyHash = indexKeyHashMethod(key); // key.hashCode() 取绝对值
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // 读槽中已有的链表头
long timeDiff = (storeTimestamp - this.indexHeader.getBeginTimestamp()) / 1000;
// 把新索引项追加到 indexCount 位置
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE
+ this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 12, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue); // 前一个索引项(链表)
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); // 更新槽头
this.indexHeader.incIndexCount();
return true;
}
每条索引项 20 字节:key 哈希(4B)+ CommitLog 偏移量(8B)+ 时间差(4B)+ 前向指针(4B)。
时间差存的是"相对于该 IndexFile 第一条消息的秒数偏移",不是绝对时间戳,节省了 4 字节(用 int 而不是 long)。
查询时(selectPhyOffset())是反向遍历链表:从槽头拿到最新索引项,比对 key 哈希和时间范围,命中就收集 phyOffset,然后顺着 prevIndex 往前走。整个文件也是 mmap 的 MappedByteBuffer,查询同样在内存里完成。
消费侧零拷贝:FileRegion + transferTo
消费侧零拷贝发生在 Broker 响应 Pull 请求 时:消息已在 CommitLog 的 PageCache/mmap 里,Broker 尽量 不把它整段 copy 到堆上,而是通过 Netty FileRegion 流式写出。前面 ConsumeQueue 章节讲过,每条 CQ 索引 20 字节,其中 8 字节是 CommitLog 物理偏移、4 字节是消息大小——读路径正是用这两个字段定位 body,写入侧的 mmap 押注在这里兑现。
存储层:mmap slice,不整段读取
DefaultMessageStore.getMessage() 遍历 ConsumeQueue,对每条命中的索引取出物理偏移和大小,再调 CommitLog.getMessage(offsetPy, sizePy)。核心在 selectMappedBuffer()——对已经 mmap 的 CommitLog 文件做 slice,返回一个指向 PageCache 的视图,不会把消息读进新的 byte 数组:
// DefaultMappedFile.selectMappedBuffer(pos, size)
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
GetMessageResult.addMessage() 把每个 SelectMappedBufferResult 及其 ByteBuffer 收进列表。这里有个生命周期问题:selectMappedBuffer() 内部会 mappedFile.hold() 把 MappedFile 引用计数 +1,防止发送过程中文件被 swap 或释放。发送完成后,getMessageResult.release() 逐个释放;FileRegion 路径下由 ManyMessageTransfer.deallocate() 触发。
多条消息批量 Pull 用 ManyMessageTransfer;单条消息场景(Pop、Peek 等)用结构相同的 OneMessageTransfer,逻辑一样,只是 body 只有一个 slice。
发送层:两条路径,默认走堆拷贝
关键开关是 BrokerConfig.transferMsgByHeap,默认值为 true。官方 operation 文档建议生产环境设为 false 以提升拉消息效率——也就是说,零拷贝不是开箱即用的默认行为,需要显式关闭堆拷贝。
路径 A — 默认(transferMsgByHeap=true)
// DefaultPullMessageResultHandler.handle()
final byte[] r = this.readGetMessageResult(getMessageResult, ...);
response.setBody(r);
return response; // 走 NettyRemotingAbstract.writeResponse → 普通 RemotingCommand 编码
readGetMessageResult() 会 ByteBuffer.allocate(totalSize),把所有 mmap slice 逐条拷贝进堆上的 byte[],然后 getMessageResult.release()。这是有拷贝的路径,实现简单,兼容性最好。
路径 B — 零拷贝(transferMsgByHeap=false)
FileRegion fileRegion = new ManyMessageTransfer(
response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(f -> getMessageResult.release());
return null; // response 已直接写出,不再走 writeResponse
encodeHeader(bodyLength) 只在堆上生成 Remoting 协议头(帧长度 + header 数据),body 仍指向 mmap slice。注意 return null 是故意的:Pull 处理器不再把 response 交给 writeResponse() 二次发送——异步路径里 thenAccept(result -> writeResponse(...)) 收到 null 即表示已写出。
整体时序如下:
sequenceDiagram
participant Consumer
participant PullProcessor as PullMessageProcessor
participant Store as DefaultMessageStore
participant CL as CommitLog_mmap
participant Handler as DefaultPullMessageResultHandler
participant Netty
Consumer->>PullProcessor: PullMessageRequest
PullProcessor->>Store: getMessage()
Store->>CL: selectMappedBuffer slice
CL-->>Store: SelectMappedBufferResult
Store-->>Handler: GetMessageResult
alt transferMsgByHeap=true
Handler->>Handler: readGetMessageResult heap copy
Handler->>Netty: RemotingCommand with body[]
else transferMsgByHeap=false
Handler->>Netty: ManyMessageTransfer FileRegion
Netty->>Consumer: header + mmap slices stream
endFileRegion 怎么分段写
ManyMessageTransfer 实现了 Netty 的 FileRegion 接口(不是 Netty 自带的 DefaultFileRegion)。Netty 写出时会多次调用 transferTo(),每次只推进一块 buffer:
public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
if (this.byteBufferHeader.hasRemaining()) {
transferred += target.write(this.byteBufferHeader);
return transferred;
}
for (ByteBuffer bb : getMessageResult.getMessageBufferList()) {
if (bb.hasRemaining()) {
transferred += target.write(bb);
return transferred;
}
}
return 0;
}
@Override
public long count() {
return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
}
}
顺序是:先写协议头(byteBufferHeader),再逐条写 messageBufferList 里的 mmap slice。count() 返回 header + 全部消息体的总字节数,与 Remoting 帧格式一致。transferTo() 内部调用的是 WritableByteChannel.write(ByteBuffer),写入的是 CommitLog mmap 的 slice 视图。
零拷贝的准确含义与边界
不要把它理解成「数据完全不经过用户态」——RocketMQ 这里省掉的是 Broker 侧把整批消息 body 拷贝到 byte[] 的那一步。通过 FileRegion 让 Netty 有机会走 Channel 直写 mmap/direct buffer 的路径,避免 Broker 堆内存中转。
仍有的开销和边界:
- Remoting 协议头始终在堆上编码(
encodeHeader())。 - Consumer 收到数据后,仍要在客户端堆上解析 body。
- 开启 TLS 时,Pipeline 会插入
FileRegionEncoder,把FileRegion先写入ByteBuf再加密——零拷贝失效(类注释写得很清楚:加密传输不能直传 socket)。 - 不宜笼统说「底层一定是
FileChannel.transferTo()」;RocketMQ 自定义了FileRegion实现,走的是write(ByteBuffer)分段写出,具体是否触发内核 sendfile 取决于 JDK 和 OS 对 mapped buffer 写 socket 的优化。
本地调试:Broker 设 transferMsgByHeap=false,在 DefaultPullMessageResultHandler.handle() 打断点,确认走 writeAndFlush(fileRegion) 分支;改回 true 则进入 readGetMessageResult(),两条路径对比一目了然。
把这几个点串起来
设计篇讲了"是什么",源码篇讲的是"在哪、怎么做",两篇放在一起看,有几个细节值得记住:
写入缓冲区的二选一:appendMessageBuffer() 里那个三元表达式,writeBuffer != null 时用堆外内存,否则用 mmap,这个分支决定了整条写入链路的走向。TransientStorePool 不是默认开启的,是一个可选的性能优化,有额外的提交步骤,但可以大大减少 PageCache 的竞争。
三个位点,两个后台线程:wrotePosition 是写入边界,committedPosition 是从堆外提交到 FileChannel 的边界(只有开 TransientStorePool 时才有意义),flushedPosition 是真正落盘的边界。CommitRealTimeService 推进 committed,FlushRealTimeService 推进 flushed,两条线各跑各的。
ReputMessageService 的 offset 追赶:reputFromOffset 不是从 0 开始的,Broker 启动时会从 checkpoint 文件和 ConsumeQueue 最大 offset 中算出一个安全的起点。正常运行时追得很快,几乎是实时的;Broker 重启恢复时会有一个追赶过程,追完之后消费者才能看到完整的消息。
IndexFile 的时间差压缩:timeDiff 存的是秒级偏移量,用 int 存。这意味着一个 IndexFile 能覆盖的时间跨度最大是 Integer.MAX_VALUE 秒,约等于 68 年——完全够用,还省了一半空间。
把设计篇和源码篇合在一起,RocketMQ 存储的核心链路就基本透了。设计是理念,源码是落地。很多时候读完设计觉得"懂了",但代码里的那个三元表达式、那个双 list swap、那个 mlock 调用,才是真正把性能推上去的地方。