目录

rocketmq源码分析_文件恢复和过期文件删除

前言

思考

  • RocketMQ文件恢复的核心数据结构有哪些?

  • 过期文件删除的条件有哪些?

文件恢复

CommitLog文件、ConsumeQueue文件、Index文件这3个文件在构建过程中会有不一致的情况发生,文件恢复就是指将这些文件由不一致做到最终一致。

CommitLog、ConsumeQueue存储形式以及数据结构

http://img.cana.space/picStore/20201129210552.png

一个Broker上只有一份CommitLog文件,按照一定大小切割成多份文件,ConsumeQueue按照topic-queueId分成多个文件,也是再按照一定大小切割成多份文件。

ConsumeQueue中索引数据结构定长20个字节。CommitLogOffset对应physicalOffset,意思一样;MessageSize对应msgSize;queueOffset对应ConsumeQueue中索引数据的索引位置

RocketMQ文件恢复的必要性

由于ConsumeQueue和Index文件的构建是异步的,不可能做到强一致

  • CommitLog已经刷盘的数据,ConsumeQueue/Index还没有构建完成?
    • 恢复:将没有构建的消息数据再构建一遍
  • ConsumeQueue/Index已经构建完成并刷盘的数据,CommitLog没有完成刷盘或者CommitLog文件被破坏了一部分
    • 恢复:删除ConsumeQueue/Index文件中的脏数据
  • 所有的文件大小都是固定的,需要确定最近几个文件的具体刷盘位置?

恢复流程

http://img.cana.space/picStore/20201129211821.png

load->恢复

ConsumeQueue的回复

  • MappedFile

    • FlushedPosition 刷盘位置
    • CommittedPosition 提交位置
    • WrotePosition 内存映射位置(写的位置)
  • MappedFileQueue

    • FlushedWhere 刷盘位置
    • CommittedWhere 提交位置
  • ConsumeQueue

    • maxPhysicOffset CommitLog文件中最大物理偏移

CommitLog的恢复

下面两个流程处理过程差别不是很大,上图中只是以异常恢复流程为例

  • 正常恢复流程:broker正常退出
  • 异常恢复流程:broker异常crash退出

主要恢复工作

  • MappedFile
    • FlushedPosition 刷盘位置
    • CommittedPosition 提交位置
    • WrotePosition 内存映射位置(写的位置)
  • MappedFileQueue
    • FlushedWhere 刷盘位置
    • CommittedWhere 提交位置
  • 重新构建ConsumeQueue、IndexFile
  • CommitLog和ConsumeQueue要保持一致

恢复TopicQueueTable

  • HashMap<String/*topic-queueid*/, Log/*offset*/>

    维护了每个topic-队列 对应的最大逻辑偏移,生产消息的时候用来定位ConsumeQueue当前的最大偏移位置

  • ConsumeQueue.correctMinOffset(int minPhyOffset)

    minPhyOffset需要根据物理的offset进行修正

文件恢复源码分析

DefaultMessageStore#load

模拟正常恢复过程,生产消息->构建ConsumeQueue和Index->messageStore.shutdown -> load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean load() {
    boolean result = true;

    try {
        // abort文件是否存在,约定broker正常关闭就会删掉这个文件,异常关闭就保留
        // 根据是否存在这个文件可以知道上次是正常关闭还是异常关闭
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        // 处理定时消息
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }

        // 初始化CommitLog目录对应的所有MappedFile
        // load Commit Log
        result = result && this.commitLog.load();

        // 初始化所有Topic下所有队列的MappedFile、ConsumeQueue初始化
        // load Consume Queue
        result = result && this.loadConsumeQueue();

        if (result) {
            // 数据安全落地的CheckPoint,决定CommitLog文件从哪里开始恢复
            this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

            // 索引文件初始化
            this.indexService.load(lastExitOK);

            // CommitLog/ConsumeQueue/Index开始恢复
            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}

org.apache.rocketmq.store.DefaultMessageStore#recover

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private void recover(final boolean lastExitOK) {
    // 恢复每个队列的最大逻辑偏移
    // 恢复每个队列的最后一个消息在CommitLog绝对物理位置
    // 但这些数据和CommitLog不一致,待会恢复CommitLog的时候需要重建或者删除脏数据
    // 返回:在所有逻辑队列最后一个消息的CommitLog绝对物理位置,取最大值
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

    if (lastExitOK) {
        // 正常恢复CommitLog文件,保持和ConsumeQueue一致
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        // 异常情况下恢复CommitLog文件,保持和ConsumeQueue一致
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }

    // 恢复CommitLog维护的这个hashmap:HashMap<String/*topic-queueid*/, Log/*offset*/>
    // 每个Topic的每个队列的最大逻辑便宜
    // 修正每个Topic的每个队列的最小逻辑偏移
    this.recoverTopicQueueTable();
}

ConsumeQueue的恢复

org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * 恢复队列
 *
 * @return
 */
private long recoverConsumeQueue() {
    long maxPhysicOffset = -1;
    // 遍历每个队列
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            // 恢复一个逻辑队列的数据,要遍历所有topic的所有队列
            logic.recover();
            if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                // 恢复的过程中会记录每个逻辑队列最后一个消息在CommitLog文件中的绝对物理位置
                maxPhysicOffset = logic.getMaxPhysicOffset();
            }
        }
    }

    return maxPhysicOffset;
}

org.apache.rocketmq.store.ConsumeQueue#recover

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public void recover() {
    /**
     * 恢复的数据
     *
     * MappedFile
     * - FlushedPosition 刷盘位置
     * - CommittedPosition 提交位置
     * - WrotePosition 内存映射位置(写的位置)
     * MappedFileQueue
     * - FlushedWhere 刷盘位置
     * - CommittedWhere 提交位置
     * ConsumeQueue
     * - minPhyOffset
     */
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {

        // 从倒数第三个文件开始恢复,这样可以对虚拟内存做一个预热
        // 只有最后一个文件需要做恢复,另两个文件只是进行一个预热
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;

        int mappedFileSizeLogics = this.mappedFileSize;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();

        // 在某个MappedFile相对位置
        long mappedFileOffset = 0;
        long maxExtAddr = 1;
        while (true) {
            for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                // ConsumeQueue索引数据 结构
                long offset = byteBuffer.getLong();
                int size = byteBuffer.getInt();
                long tagsCode = byteBuffer.getLong();

                // 计算最大物理偏移
                if (offset >= 0 && size > 0) {
                    mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                    this.maxPhysicOffset = offset + size;
                    if (isExtAddr(tagsCode)) {
                        maxExtAddr = tagsCode;
                    }
                } else {
                    log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                    break;
                }
            }

            if (mappedFileOffset == mappedFileSizeLogics) {
                index++;
                if (index >= mappedFiles.size()) {

                    log.info("recover last consume queue file over, last mapped file "
                            + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next consume queue file, " + mappedFile.getFileName());
                }
            } else {
                log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                break;
            }
        }

        // 设置mappedFileQueue恢复后的值
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        // 对最后一个文件进行偏移的计算,恢复偏移的值
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        if (isExtReadEnable()) {
            this.consumeQueueExt.recover();
            log.info("Truncate consume queue extend file by max {}", maxExtAddr);
            this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
        }
    }
}

org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void truncateDirtyFiles(long offset) {
    List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

    for (MappedFile file : this.mappedFiles) {
        long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
        if (fileTailOffset > offset) {
            if (offset >= file.getFileFromOffset()) {
                file.setWrotePosition((int) (offset % this.mappedFileSize));
                file.setCommittedPosition((int) (offset % this.mappedFileSize));
                file.setFlushedPosition((int) (offset % this.mappedFileSize));
            } else {
                file.destroy(1000);
                willRemoveFiles.add(file);
            }
        }
    }

    this.deleteExpiredFile(willRemoveFiles);
}

CommitLog的恢复

正常恢复

org.apache.rocketmq.store.CommitLog#recoverNormally

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
 * broker正常关闭时,CommitLog的恢复,和恢复ConsumeQueue很像
 * 正常关闭情况下,ConsumeQueue落地的消息 >= CommitLog落地的消息,因为CommitLog还没有落盘,ConsumeQueue就已经开始构建了
 * 从倒数第三个文件开始恢复
 * 1.MappedFile
 * - FlushedPosition 刷盘位置
 * - CommittedPosition 提交位置
 * - WrotePosition 内存映射位置(写的位置)
 * 2.MappedFileQueue
 * - FlushedWhere 刷盘位置
 * - CommittedWhere 提交位置
 * 3.清除ConsumeQueue的脏数据(如果ConsumeQueue落地的消息 > CommitLog落地的消息)
 * When the normal exit, data recovery, all memory data have been flush
 */
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // Began to recover from the last third file
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;

        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            // Normal data
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
            // Come the end of the file, switch to the next file Since the
            // return 0 representatives met last hole,
            // this can not be included in truncate offset
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            // Intermediate file read error
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }

        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        // Commitlog case files are deleted
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

异常回复

org.apache.rocketmq.store.CommitLog#recoverAbnormally

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
/**
 * broker异常时,CommitLog的恢复
 * ConsumeQueue落地的消息 >= CommitLog落地的消息 或者 ConsumeQueue落地的消息 < CommitLog落地的消息
 * 从最后一个文件尝试开始修复,第一个消息的落地时间小于StoreCheckPoint记录的时间,
 * 这样可以确保这个时间点之前的CommitLog文件和ConsumeQueue文件是一致的,只有这个点之后的数据才可能存在不一致
 *
 * 1.- MappedFile
 *   - FlushedPosition 刷盘位置
 *   - CommittedPosition 提交位置
 *   - WrotePosition 内存映射位置(写的位置)
 * 2.- MappedFileQueue
 *   - FlushedWhere 刷盘位置
 *   - CommittedWhere 提交位置
 * 3.- 重新构建ConsumeQueue、IndexFile
 * 4.- CommitLog和ConsumeQueue要保持一致,清除ConsumeQueue的脏数据
 * 和正常恢复的区别:
 * 正常
 *  - 从倒数第三个文件开始恢复
 * 异常
 *  - 从最后一个文件
 * @param maxPhyOffsetOfConsumeQueue
 */
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    // recover by the minimum time stamp
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // Looking beginning to recover from which file
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }

        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }

        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();

            if (dispatchRequest.isSuccess()) {
                // Normal data
                if (size > 0) {
                    mappedFileOffset += size;

                    // 重新构建ConsumeQueue和Index
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                // Come the end of the file, switch to the next file
                // Since the return 0 representatives met last hole, this can
                // not be included in truncate offset
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }

        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            // ConsumeQueue落地的数据 >= CommitLog落地的数据,需要清除ConsumeQueue的脏数据
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

恢复TopicQueueTable

org.apache.rocketmq.store.DefaultMessageStore#recoverTopicQueueTable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public void recoverTopicQueueTable() {
        // 维护了每个topic-队列 对应的最大逻辑偏移,生产消息的时候用来定位ConsumeQueue当前的最大偏移位置
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                table.put(key, logic.getMaxOffsetInQueue());
                // minPhyOffset需要根据物理的offset进行修正
                logic.correctMinOffset(minPhyOffset);
            }
        }

        this.commitLog.setTopicQueueTable(table);
    }

文件删除

哪些条件会触发RocketMQ删除文件

  • 如果非当前写文件在一定时间间隔内没有被再次被更新,则认为是过期文件,可以删除。

  • 如果磁盘空间不充足,也应该触发过期文件删除操作

  • executeDeleteFilesManualy,可以手动删除过期文件,但是mqAdmin目前没有对外提供接口

代码调用流程

  • DefaultMessageStore#addScheduleTask

    将删除文件的过程放在一个定时任务里

  • DefaultMessageStore#cleanFilesPeriodically

    定时任务定时清理CommitLog和ConsumeQueue,清理ConsumeQueue的同时会一并清理Index文件

    • CleanCommitLogService#run
    • CleanConsumeQueueService#run
      • IndexService#deleteExpiredFiles

源码分析

org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask

1
2
3
4
5
6
7
8
9
private void addScheduleTask() {

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
  ...

org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically

1
2
3
4
5
6
private void cleanFilesPeriodically() {
        // 删除CommitLog文件
        this.cleanCommitLogService.run();
        // 删除ConsumeQueue/Index文件
        this.cleanConsumeQueueService.run();
    }

下面看一下删除CommitLog的逻辑,删除ConsumeQueue和删除CommitLog逻辑类似

org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public void run() {
    try {
        // 删除过期文件
        this.deleteExpiredFiles();

        // 如果待删除的正在被读取,第一次和后面的一段时间将拒绝删除
        // 一段时间之后挂起的文件将被强制删除
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void deleteExpiredFiles() {
    int deleteCount = 0;
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    // 删除文件间隔,连续删除文件会占用大量的IO会造成抖动,删除一个文件之后休息一会
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

    // 是否是删除过期文件时间,默认凌晨4点,负载比较低
    boolean timeup = this.isTimeToDelete();
    // 剩余磁盘空间是否足够
    boolean spacefull = this.isSpaceToDelete();
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                fileReservedTime,
                timeup,
                spacefull,
                manualDeleteFileSeveralTimes,
                cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        // 执行删除
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceToDelete

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private boolean isSpaceToDelete() {
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

    cleanImmediately = false;

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        // 当前CommitLog目录所在磁盘分区的磁盘使用率
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        if (physicRatio > diskSpaceWarningLevelRatio) {
            // 设置磁盘不可写,会拒绝新消息的写入
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }

            // 触发立即删除文件
            cleanImmediately = true;
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {
            // 不会拒绝新消息的写入
            cleanImmediately = true;
        } else {
            // 清除标记,确保新消息可以写入
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
            }
        }

        if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
        }
    }
  ....

    return false;
}