目录

rocketmq源码分析_消息存储之内存映射

概述

本章内容

  • 映射文件的获取
  • 映射文件的创建
  • 消息的写入

基本概念

http://img.cana.space/picStore/20201129105939.png

  1. 映射文件:也即MappedFile。

  2. 偏移量:每个 CommitLog 文件的大小为 1G,一般情况下第一个 CommitLog 的起始偏移量为 0,第二个 CommitLog 的起始偏移量为 1073741824 (1G = 1073741824byte)。假设 1073742827 为物理偏移量(物理偏移量也即全局偏移量),则其对应的相对偏移量为 1003(1003 = 1073742827 - 1073741824),并且该偏移量位于第二个 CommitLog。

主要类

http://img.cana.space/picStore/20201129110135.png

消息写入流程

http://img.cana.space/picStore/20201129110159.png

获取映射文件

代码调用流

  1. DefaultMessageStore#putMessage(MessageExtBrokerInner msg)

  2. CommitLog#putMessage(final MessageExtBrokerInner msg)

  3. MappedFileQueue#getLastMappedFile()

源码分析

获取文件

org.apache.rocketmq.store.CommitLog#putMessage

1
2
3
4
// 获得最近一个CommitLog文件的内存映射文件(零拷贝)
        // 一个MappedFile对应一个CommitLog切割文件,mappedFileQueue来管理这些连续的CommitLog切割文件
        // 可以暂时将MappedFile理解成一个高性能的磁盘I/O接口
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * 获取最后一个MappedFile
 *
 * @return
 */
public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;

    while (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}

文件是怎么来的

org.apache.rocketmq.store.MappedFileQueue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * MappedFileQueue所维护的素有映射文件{@link MappedFile}集合
 */
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

...
  
    /**
     * 在broker服务端启动的时候,对MappedFileQueue中的mappedFiles进行初始化
     * <p>
     * 方法首先会根据文件名对CommitLog文件进行升序排序,然后丢弃大小不为mappedFileSize的文件及其后续文件
     * @return
     */
    public boolean load() {
        File dir = new File(this.storePath); // CommitLog文件的存储目录
        File[] files = dir.listFiles(); // 获取所有文件
        if (files != null) {
            // ascending order
            Arrays.sort(files);
            for (File file : files) {

                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                            + " length not matched message store config value, please check it manually");
                    return false;
                }

                try {
                    // 根据CommitLog文件的路径和mappedFileSize(默认1G)创建MappedFile
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }

        return true;
    }
...

创建映射文件

创建映射文件代码流

  1. CommitLog#putMessage(final MessageExtBrokerInner msg)

  2. MappedFileQueue#getLastMappedFile(final long startOffset, boolean needCreate)

  3. AllocateMappedFileService#putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize)

AllocateRequest

http://img.cana.space/picStore/20201129113325.png

RocketMQ为了提高性能有一个设计,就是在创建CommitLog时会一并创建下一个CommitLog,所以在选择CommitLog时要根据文件名称选择小的那个才是这次需要使用的CommitLog。

mappedFile作为引用变量传入方法,传进去前是空,由方法里面进行创建填充。

AllocateMappedFileService 映射文件分配服务线程

http://img.cana.space/picStore/20201129113753.png

分配逻辑中的等待通知模型

http://img.cana.space/picStore/20201129114134.png

源码分析

创建文件入口:

org.apache.rocketmq.store.CommitLog#putMessage

1
2
3
4
5
// 调用MappedFileQueue#getLastMappedFile()方法获取最后一个映射文件
// 弱还没有映射文件或者已有的最后一个映射文件已经写满则创建一个新的映射文件返回
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}

org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long, boolean), 参数是0,true

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * 如果needCreate为true,当映射文件不存在或者映射文件已经写满,会计算新的映射文件的起始物理偏移量(该偏移量会用作映射文件所对应的CommitLog)
 * 然后通过{@link AllocateMappedFileService} 预分配映射文件服务线程来获取新的映射文件
 * <p>
 * 如果为false直接返回所获取的最后一个映射文件
 *
 * @param startOffset
 * @param needCreate
 * @return
 */
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    // 要创建的映射文件对应的CommitLog的物理偏移量
    long createOffset = -1;
    // 获取最后一个映射文件,如果其为null或者已经写满,会走创建逻辑
    MappedFile mappedFileLast = getLastMappedFile();

    // 最后一个映射文件为null,也即mappedFiles为空,创建一个新的映射文件(这也是第一个映射文件)
    if (mappedFileLast == null) {
        // 计算将要创建的映射文件的物理偏移量
        // 如果指定的startOffset不足mappedFileSize,则从offset 0开始
        // 否则,从为mappedSize整数倍的offset开始
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }

    // 最后一个映射文件已经写满了,创建一个新的映射文件
    if (mappedFileLast != null && mappedFileLast.isFull()) {
        // 计算将要创建的映射文件的物理偏移量
        // 该映射文件的物理偏移量等于上一个CommitLog文件的起始偏移量加上CommitLog文件大小
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    // 创建新的映射文件
    if (createOffset != -1 && needCreate) {
        // 需要创建两个映射文件
        // CommitLog文件全路径
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
        MappedFile mappedFile = null;

        // 优先通过allocateMappedFileService创建映射文件,因为是预分配方式,性能很高
        // 如果上述方式分配失败,再通过new创建映射文件
        if (this.allocateMappedFileService != null) {
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}

org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    // 默认可以提交两个请求
    int canSubmitRequests = 2;
    // 仅当transientStorePoolEnable为true,FlushDiskType为ASYNC_FLUSH,并且broker为主节点时,才启用transientStorePool
    // 同时在启用快速失败策略时,计算transientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量,如果数量小于等于
    // TransientStorePool 堆外内存池
    if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
            && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
            // 堆外内存池是否可用,默认内存池大小为5个,减去请求个数
            canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
        }
    }

    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

    // 先处理第一个请求,第二个请求能处理就处理,不能处理就不处理
    // 如果requestTable中已存在该路径文件的分配请求,说明该请求已经在排队中,
    // 就不需要再次检查TransientStorePool中的buffer是否够用,以及向requestQueue队列中添加分配请求
    if (nextPutOK) {
        if (canSubmitRequests <= 0) { // 如果transientPool中的buffer不够了,快速失败
            log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
            this.requestTable.remove(nextFilePath);
            return null;
        }
        boolean offerOK = this.requestQueue.offer(nextReq);
        if (!offerOK) {
            log.warn("never expected here, add a request to preallocate queue failed");
        }
        canSubmitRequests--;
    }

    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
    if (nextNextPutOK) {
        if (canSubmitRequests <= 0) {
            log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
            this.requestTable.remove(nextNextFilePath);
        } else {
            boolean offerOK = this.requestQueue.offer(nextNextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
        }
    }
    // mapedOperation遇到了异常,先不创建映射文件了
    if (hasException) {
        log.warn(this.getServiceName() + " service has exception. so return null");
        return null;
    }

    AllocateRequest result = this.requestTable.get(nextFilePath);
    try {
        if (result != null) {
            // 等待,默认超时5秒钟, 创建过程见this#run
            boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
            if (!waitOK) {
                log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                return null;
            } else {
                this.requestTable.remove(nextFilePath);
                return result.getMappedFile();
            }
        } else {
            log.error("find preallocate mmap failed, this never happen");
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
    }

    return null;
}

真正创建的过程在线程的run方法中:

org.apache.rocketmq.store.AllocateMappedFileService#run

1
2
3
4
5
6
7
8
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped() && this.mmapOperation()) {

    }
    log.info(this.getServiceName() + " service end");
}

org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
 * 处理请求,创建映射文件
 * 方法只有被外部线程中断,才会返回false
 * Only interrupted by the external thread, will return false
 */
private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        // 检索并删除此队列的首节点,必要时等待,直到有元素可用
        req = this.requestQueue.take();
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
        // 对请求做一个校验,requestQueue和requestTable中的AllocateRequest要是同一个
        if (null == expectedRequest) {
            log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
            return true;
        }
        if (expectedRequest != req) {
            log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
            return true;
        }

        if (req.getMappedFile() == null) {
            long beginTime = System.currentTimeMillis();

            MappedFile mappedFile;
            // 是否启用堆外内存池,使用堆外内存池会使用spi机制
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                    log.warn("Use default implementation."); // 遇到运行异常时使用默认配置
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }

            // 计算创建映射文件耗时
            long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
            if (eclipseTime > 10) {
                int queueSize = this.requestQueue.size();
                log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                    + " " + req.getFilePath() + " " + req.getFileSize());
            }


            // pre write mappedFile
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMapedFileSizeCommitLog()
                &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                // 进行预热
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            }
            // 传入当前创建的映射文件mappedFile到映射文件
            req.setMappedFile(mappedFile);
            this.hasException = false;
            isSuccess = true;
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
        this.hasException = true; // 标记发生异常
        return false; // 被中断,结束服务线程
    } catch (IOException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.hasException = true; // 标记发生异常,但不会结束服务线程
        if (null != req) {
            requestQueue.offer(req); // 重新加入队列再试
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    } finally {
        if (req != null && isSuccess)
            req.getCountDownLatch().countDown(); // 唤醒等待获取映射文件的线程
    }
    return true;
}

预热:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
 * 对当前映射文件进行预热
 * <p>
 * 具体的,先对当前映射文件的每个内存页(IO读写是以内存页为单位,一个内存也4K)写入一个字节0,当刷盘策略为同步刷盘时,执行强制刷盘,并且是每修改pages个分页刷一次盘
 * 然后将当前映射文件全部的地址空间锁定在物理存储中,防止其被交换到swap空间
 * 再调用madvise,传入WILL_NEED策略,将刚刚锁住的内存预热,其实就是告诉内核,我马上就要用(WILL_NEED)这块内存,
 * 先做虚拟内存到物理内存的映射。
 *
 * @param type  刷盘策略
 * @param pages 预热时一次刷盘的分页数
 */
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0; // 记录上一次刷盘的字节数
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        // 当刷盘策略为同步刷盘时,执行强制刷盘
        // 每修改pages个分页刷一次盘
        if (type == FlushDiskType.SYNC_FLUSH) {
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                // 释放cpu
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);

    this.mlock();
}

new MappedFile

创建映射文件有两种方式,如下:

1
2
3
4
5
6
7
8
public MappedFile(final String fileName, final int fileSize) throws IOException {
    init(fileName, fileSize);
}

public MappedFile(final String fileName, final int fileSize,
                  final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize, transientStorePool);
}

对应的消息写入也有两种方式,他们是一一对应的。一种是通过mappedByteBuffer写入,另一种是通过堆外内存池TransientStorePool中的writeBuffer进行写入。

http://img.cana.space/picStore/20201129132521.png

这两个buffer和filechannel都是mappedFile类里的属性。

写入到磁盘说明

  • 调用mappedByteBuffer的flush直接写入磁盘
  • 使用堆外内存,则需要先调用writeBuffer的commmit方法写入fileChannel,再调用fileChannel的flush将消息写入到CommitLog。

预热说明

this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

虚拟内存映射,此时并没有真正对应的物理内存,如果不进行预热,进行消息写入就会找物理内存进而发生缺页中断,预热时使用一个字节填充到CommitLog大小(默认1G)的所有内存页(一页大小4K),可以防止消息真正写入的时候发生缺页中断,提升写入性能。

同时为了防止将已经调入内存的CommitLog对应的页在页面置换时又被操作系统调出内存(从快表中删除相应表项),需要将这些页锁死,不让操作系统去调出这些页

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
/**
 * 将当前映射文件全部的地址空间锁定在物理存储中,防止其被交换到swap空间
 * 再调用madvise,传入WILL_NEED策略,将刚刚锁住的内存预热,其实就是告诉内核,我马上就要用(WILL_NEED)这块内存,
 */
public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
      
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    {
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}

mlock和madive是linux系统提供的api,作用如下:

  • mlock 可以让调用者的地址空间常驻物理内存,也可以在需要的时候将此特权取消。
  • madvise 可以将调用者对数据访问模式的预期传递给Linux,以期得到更好的性能。

MMAP

一般磁盘IO

20201129134815

为什么需要拷贝到socket buffer?

数据存储在用户空间里内存地址不是连续的,如果要通过网络传输的话数据的内存地址必须是连续的,所以要先将数据拷贝到socket buffer,做成一段连续的内存地址然后再拷贝到网卡再传输出去。

使用MMAP进行磁盘IO

使用MMAP后kernel buffer和user buffer不用再拷贝了,两者共享一段内存。

20201129134836

如果网卡驱动支持聚合操作的话,socket buffer也不需要了,直接将kernel buffer中的数据拷贝到网卡,所谓0拷贝,并不是不拷贝,而是将4次拷贝变成了3次拷贝或者2次拷贝,原先的kernel buffer不再拷贝到user buffer

https://gitee.com/lienhui68/picStore/raw/master/null/20200831200439.png

MappedFile字段说明

http://img.cana.space/picStore/20201129140311.png

写入消息代码流

  1. CommitLog#putMessage(final MessageExtBrokerInner msg)

  2. MappedFile#appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb)

  3. DefaultAppendMessageCallback#doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)

源码分析

org.apache.rocketmq.store.CommitLog#putMessage

1
2
// 把Broker内部的这个Message刷新到MappedFile内存,还没有刷盘,进去看一下
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

org.apache.rocketmq.store.MappedFile#appendMessagesInner

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        // 参数非空校验
        assert messageExt != null;
        assert cb != null;

        // 当前这个MappedFile写入位置
        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) { // 文件还有剩余空间
            // 这个Buffer和咱们的同步/异步刷盘相关(异步刷盘还有两种刷盘模式可供选择)
            // 仅当transientStorePoolEnable为true,FlushDiskType为ASYNC_FLUSH,并且broker为主节点时,才启用transientStorePool
            // writeBuffer/mappedByteBuffer的position始终为0,而limit则始终等于capacity。
            // slice是根据position和limit来生成bytebuffer
            // 两种写入方式分别对应writeBuffer和mappedByteBuffer,前者使用内存池,后者不使用
            // writeBuffer是从堆外内存池中获取的,this.writeBuffer = transientStorePool.borrowBuffer();
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            // 针对不同的类型,分别执行不同的追加消息逻辑
            // 处理单个消息
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
                // 处理批量消息
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            // 刷新写入的位置
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
 * 追加单条消息
 *
 * @param fileFromOffset commitLog文件起始偏移量。其实就是文件名称,一般为20位数字,代表这个文件开始时的offset
 * @param byteBuffer 写消息缓冲区(MappedFile#writeBuffer or MappedFile#mappedByteBuffer)
 * @param maxBlank 写消息缓冲区可用空间大小
 * @param msgInner broker对消息的内部包装
 * @return
 */
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                                    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    /**
     * wroteOffset 消息的绝对物理位置
     * fileFromOffset: 一个commitLog文件(对应额一个MappedFile)对应的偏移量(文件名就代表这个偏移量)
     * byteBuffer.position(): 当前MappedFile(对应一个CommitLog切割文件)的写位置
     */
    long wroteOffset = fileFromOffset + byteBuffer.position();

    // 将hostHolder重置,使其可以再次使用
    this.resetByteBuffer(hostHolder, 8);
    // 根据broker的存储地址和消息的物理绝对位置生成唯一的MessageId
    // 前4字节为存储地址的host,5~8字节为存储地址的port,最后8字节为wroteOffset
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    // 表示某个topic的某个queue的消息数量,是为后面创建ConsumeQueue准备的
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    /**
     * 序列化Message的属性和Topic
     * Serialize message
     */
    final byte[] propertiesData =
            msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    // 计算Message的长度,即写入commitLog占用的空间,bodyLength、topicLength、propertiesLength这三个参数是可变的
    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // 如果消息太大则拒绝
    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    /**
     * 确定当前这个CommitLog文件是否有足够的可用空间
     * maxBlank:当前这个CommitLog文件(对应MappedFile)的剩余空间
     * 设计原则:一个Message不能跨越两个CommitLog
     * 每个CommitLog文件要确保预留8个字节来表示这个CommitLog到达文件结尾
     */
    // Determines whether there is sufficient free space
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
        // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE
        // 表示一个CommitLog文件结尾的魔数
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // 消息存储格式
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);

    // 到这里Broker内部构建的Message就准备好了
    // 1 TOTALSIZE 使用4个字节表示消息长度,但会控制在4M,前面有校验
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE 魔数,标记消息
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC 消息体循环冗余校验码
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID 消息队列ID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG 标记位
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET 这个是自增值,不是真正的consumeQueue的偏移量
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET // 消息写入的物理偏移量
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG 事务消息/压缩消息/multi tags等等标记,使用位图表示
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP 客户端创建时间戳
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST 客户端地址,前4字节为host,后4字节为port
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
    // 11 STORETIMESTAMP // 消息在broker存储时间戳
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS // 存储在broker的地址,前4字节为host,后4字节为port
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
    //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // 完成将Message写入MappedFile
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

    // 构建返回的result
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
            msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}