RocketMQ 是怎样保存消息的

1 简介

首先,在 RocketMQ 集群中创建一个 Topic,叫做 MyTestTopic,配置如下图:

这里解释一下图中的几个参数:

writeQueueNums:客户端在发送消息时,可以向多少个队列进行发送;readQueueNums:客户端在消费消息时,可以从多少个队列进行拉取;perm:当前 Topic 读写权限,2 只允许读、4 只允许写、6 允许读写,默认是 6。

RocketMQ 主要有 3 个消息相关的文件:commitlog、consumequeue 和 index。下面是这几个文件默认的路径:

[root@xxx store]# pwd
/root/store
[root@xxx store]# ls
abort  checkpoint  commitlog  config  consumequeue  index  lock

上面的 writeQueueNums 参数控制 consumequeue 的文件的数量。作为测试,我往 MyTestTopic 这个 Topic 发送了 100 条消息,这些消息保存在了 commitlog 文件。而 consumequeue 文件如下:

[root@xxx MyTestTopic]# pwd
/root/store/consumequeue/MyTestTopic
[root@xxx MyTestTopic]# ls
0  1  2  3  4  5  6  7

可以看到,consumequeue 的保存是在 consumequeue 目录下为每个 Topic 建一个目录,用保存这个 Topic 的 consumequeue 文件。consumequeue 文件为每个 Topic 基于偏移量创建了一个索引。

index 文件保存的是消息基于 key 的 HASH 索引。

2 commitlog 文件

commitlog 是 RocketMQ 保存消息的文件。commitlog 并没有按照 Topic 来分割,所有 Topic 的消息都写入同一个 commitlog。

为了追求高效写入,RocketMQ 使用了磁盘顺序写。commitlog 文件大小默认是 1G,可以通过参数 mappedFileSizeCommitLog 来修改。

下面是服务器磁盘上保存的 commitlog 文件(文件大小 1G):

[root@xxx commitlog]# pwd
/root/store/commitlog
[root@xxx commitlog]# ls
00000000000000000000  00000000001073741824

如果配置 mappedFileSizeCommitLog 参数为 1048576,也就是 1M,则服务器磁盘上保存的 commitlog 文件如下:

[root@xxx commitlog]# pwd
/root/store/commitlog
[root@xxx commitlog]# ls
00000000000000000000  00000000000001048576  00000000000002097152  00000000000003145728  00000000000004194304  00000000000005242880

可以看到:commitlog 文件的命名以保存在文件中的消息最小的偏移量来命名的,后一个文件的名字是前一个文件名加文件大小。比如上面的前两个文件,第一个文件中消息最小偏移量是 0,第二个文件中消息最小偏移量是 1048576。这样通过偏移量查找消息时可以先用二分查找找到消息所在的文件,然后通过偏移量减去文件名就可以方便地找到消息在文件中的物理地址。

下面创建文件的代码可以看到 commitlog 文件的命名:

//MappedFileQueue 类
protected MappedFile tryCreateMappedFile(long createOffset) {
 String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
 String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
   + this.mappedFileSize);
 return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
//UtilAll 类
public static String offset2FileName(final long offset) {
        final NumberFormat nf = NumberFormat.getInstance();
  //文件名长度是20
        nf.setMinimumIntegerDigits(20);
        nf.setMaximumFractionDigits(0);
        nf.setGroupingUsed(false);
        return nf.format(offset);
    }

为了让 commitlog 操作效率更高,RocketMQ 使用了 mmap 将磁盘上日志文件映射到用户态的内存地址中,减少日志文件从磁盘到用户态内存之间的数据拷贝。代码如下:

//AllocateMappedFileService 类 mmapOperation 方法
//是否开启堆外内存
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 try {
  mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
  mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
 } catch (RuntimeException e) {
  log.warn("Use default implementation.");
  mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
 }
} else {
 mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}

写入消息时,如果 isTransientStorePoolEnable 方法返回 true,则消息数据先写入堆外内存,然后异步线程把堆外内存数据刷到 PageCache,如果返回 false 则直接写入 PageCache。后面根据刷盘策略把 PageCache 中数据持久化到磁盘。如下图:

对应代码如下:

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
 try {
     //1.获取 mappedFile
  MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        //2.追加消息,如果 mappedFile 写满了,则新建一个
  result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
  switch (result.getStatus()) {
   case PUT_OK:
    break;
   case END_OF_FILE:
    unlockMappedFile = mappedFile;
    // Create a new file, re-write the message
    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
    if (null == mappedFile) {
     // XXX: warn and notify me
     log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
     return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
    }
    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    break;
   //...
  }//...
 } finally {
  putMessageLock.unlock();
 }
 //3.请求刷盘
 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
 //...
}

无论先写对堆外内存还是直接写 PageCache,文件数据都会映射到 MappedByteBuffer。如下图:

不同的是,如果消息先写入堆外内存,则 MappedByteBuffer 主要用来读消息,堆外内存用来写消息。这一定程度上实现了读写分离,减少 PageCache 写入压力。

再看一下文件映射的代码,如下:

//MappedFile 类
private void init(final String fileName, final int fileSize) throws IOException {
 this.fileName = fileName;
 this.fileSize = fileSize;
 this.file = new File(fileName);
 this.fileFromOffset = Long.parseLong(this.file.getName());
 //...
 try {
  this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
  this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
  //...
 } //省略 catch finally
}

这里使用了 Java 中 FileChannel 的 map 方法来实现 mmap。

有一个细节需要注意:创建 MappedFile 后会进行文件预热,目的是为了预先将 PageCache 加载到内存,防止读写数据发生缺页中断时再加载,影响性能。代码如下:

//AllocateMappedFileService 类 mmapOperation 方法
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
 .getMappedFileSizeCommitLog()