前言
思考
-
RocketMQ文件恢复的核心数据结构有哪些?
-
过期文件删除的条件有哪些?
文件恢复
CommitLog文件、ConsumeQueue文件、Index文件这3个文件在构建过程中会有不一致的情况发生,文件恢复就是指将这些文件由不一致做到最终一致。
CommitLog、ConsumeQueue存储形式以及数据结构

一个Broker上只有一份CommitLog文件,按照一定大小切割成多份文件,ConsumeQueue按照topic-queueId分成多个文件,也是再按照一定大小切割成多份文件。
ConsumeQueue中索引数据结构定长20个字节。CommitLogOffset对应physicalOffset,意思一样;MessageSize对应msgSize;queueOffset对应ConsumeQueue中索引数据的索引位置
RocketMQ文件恢复的必要性
由于ConsumeQueue和Index文件的构建是异步的,不可能做到强一致
- CommitLog已经刷盘的数据,ConsumeQueue/Index还没有构建完成?
- ConsumeQueue/Index已经构建完成并刷盘的数据,CommitLog没有完成刷盘或者CommitLog文件被破坏了一部分
- 恢复:删除ConsumeQueue/Index文件中的脏数据
- 所有的文件大小都是固定的,需要确定最近几个文件的具体刷盘位置?
恢复流程

load->恢复
ConsumeQueue的回复
-
MappedFile
- FlushedPosition 刷盘位置
- CommittedPosition 提交位置
- WrotePosition 内存映射位置(写的位置)
-
MappedFileQueue
- FlushedWhere 刷盘位置
- CommittedWhere 提交位置
-
ConsumeQueue
- maxPhysicOffset CommitLog文件中最大物理偏移
CommitLog的恢复
下面两个流程处理过程差别不是很大,上图中只是以异常恢复流程为例
- 正常恢复流程:broker正常退出
- 异常恢复流程:broker异常crash退出
主要恢复工作
- MappedFile
- FlushedPosition 刷盘位置
- CommittedPosition 提交位置
- WrotePosition 内存映射位置(写的位置)
- MappedFileQueue
- FlushedWhere 刷盘位置
- CommittedWhere 提交位置
- 重新构建ConsumeQueue、IndexFile
- CommitLog和ConsumeQueue要保持一致
恢复TopicQueueTable
-
HashMap<String/*topic-queueid*/, Log/*offset*/>
维护了每个topic-队列 对应的最大逻辑偏移,生产消息的时候用来定位ConsumeQueue当前的最大偏移位置
-
ConsumeQueue.correctMinOffset(int minPhyOffset)
minPhyOffset需要根据物理的offset进行修正
文件恢复源码分析
DefaultMessageStore#load
模拟正常恢复过程,生产消息->构建ConsumeQueue和Index->messageStore.shutdown -> load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public boolean load() {
boolean result = true;
try {
// abort文件是否存在,约定broker正常关闭就会删掉这个文件,异常关闭就保留
// 根据是否存在这个文件可以知道上次是正常关闭还是异常关闭
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 处理定时消息
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// 初始化CommitLog目录对应的所有MappedFile
// load Commit Log
result = result && this.commitLog.load();
// 初始化所有Topic下所有队列的MappedFile、ConsumeQueue初始化
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
// 数据安全落地的CheckPoint,决定CommitLog文件从哪里开始恢复
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
// 索引文件初始化
this.indexService.load(lastExitOK);
// CommitLog/ConsumeQueue/Index开始恢复
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
|
org.apache.rocketmq.store.DefaultMessageStore#recover
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
private void recover(final boolean lastExitOK) {
// 恢复每个队列的最大逻辑偏移
// 恢复每个队列的最后一个消息在CommitLog绝对物理位置
// 但这些数据和CommitLog不一致,待会恢复CommitLog的时候需要重建或者删除脏数据
// 返回:在所有逻辑队列最后一个消息的CommitLog绝对物理位置,取最大值
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
// 正常恢复CommitLog文件,保持和ConsumeQueue一致
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
// 异常情况下恢复CommitLog文件,保持和ConsumeQueue一致
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
// 恢复CommitLog维护的这个hashmap:HashMap<String/*topic-queueid*/, Log/*offset*/>
// 每个Topic的每个队列的最大逻辑便宜
// 修正每个Topic的每个队列的最小逻辑偏移
this.recoverTopicQueueTable();
}
|
ConsumeQueue的恢复
org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
* 恢复队列
*
* @return
*/
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
// 遍历每个队列
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
// 恢复一个逻辑队列的数据,要遍历所有topic的所有队列
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
// 恢复的过程中会记录每个逻辑队列最后一个消息在CommitLog文件中的绝对物理位置
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;
}
|
org.apache.rocketmq.store.ConsumeQueue#recover
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
public void recover() {
/**
* 恢复的数据
*
* MappedFile
* - FlushedPosition 刷盘位置
* - CommittedPosition 提交位置
* - WrotePosition 内存映射位置(写的位置)
* MappedFileQueue
* - FlushedWhere 刷盘位置
* - CommittedWhere 提交位置
* ConsumeQueue
* - minPhyOffset
*/
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// 从倒数第三个文件开始恢复,这样可以对虚拟内存做一个预热
// 只有最后一个文件需要做恢复,另两个文件只是进行一个预热
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
// 在某个MappedFile相对位置
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
// ConsumeQueue索引数据 结构
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
// 计算最大物理偏移
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
// 设置mappedFileQueue恢复后的值
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
// 对最后一个文件进行偏移的计算,恢复偏移的值
this.mappedFileQueue.truncateDirtyFiles(processOffset);
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
|
org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
|
CommitLog的恢复
正常恢复
org.apache.rocketmq.store.CommitLog#recoverNormally
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
/**
* broker正常关闭时,CommitLog的恢复,和恢复ConsumeQueue很像
* 正常关闭情况下,ConsumeQueue落地的消息 >= CommitLog落地的消息,因为CommitLog还没有落盘,ConsumeQueue就已经开始构建了
* 从倒数第三个文件开始恢复
* 1.MappedFile
* - FlushedPosition 刷盘位置
* - CommittedPosition 提交位置
* - WrotePosition 内存映射位置(写的位置)
* 2.MappedFileQueue
* - FlushedWhere 刷盘位置
* - CommittedWhere 提交位置
* 3.清除ConsumeQueue的脏数据(如果ConsumeQueue落地的消息 > CommitLog落地的消息)
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
|
异常回复
org.apache.rocketmq.store.CommitLog#recoverAbnormally
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
/**
* broker异常时,CommitLog的恢复
* ConsumeQueue落地的消息 >= CommitLog落地的消息 或者 ConsumeQueue落地的消息 < CommitLog落地的消息
* 从最后一个文件尝试开始修复,第一个消息的落地时间小于StoreCheckPoint记录的时间,
* 这样可以确保这个时间点之前的CommitLog文件和ConsumeQueue文件是一致的,只有这个点之后的数据才可能存在不一致
*
* 1.- MappedFile
* - FlushedPosition 刷盘位置
* - CommittedPosition 提交位置
* - WrotePosition 内存映射位置(写的位置)
* 2.- MappedFileQueue
* - FlushedWhere 刷盘位置
* - CommittedWhere 提交位置
* 3.- 重新构建ConsumeQueue、IndexFile
* 4.- CommitLog和ConsumeQueue要保持一致,清除ConsumeQueue的脏数据
* 和正常恢复的区别:
* 正常
* - 从倒数第三个文件开始恢复
* 异常
* - 从最后一个文件
* @param maxPhyOffsetOfConsumeQueue
*/
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
mappedFileOffset += size;
// 重新构建ConsumeQueue和Index
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
// ConsumeQueue落地的数据 >= CommitLog落地的数据,需要清除ConsumeQueue的脏数据
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
|
恢复TopicQueueTable
org.apache.rocketmq.store.DefaultMessageStore#recoverTopicQueueTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public void recoverTopicQueueTable() {
// 维护了每个topic-队列 对应的最大逻辑偏移,生产消息的时候用来定位ConsumeQueue当前的最大偏移位置
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue());
// minPhyOffset需要根据物理的offset进行修正
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
|
文件删除
哪些条件会触发RocketMQ删除文件
代码调用流程
-
DefaultMessageStore#addScheduleTask
将删除文件的过程放在一个定时任务里
-
DefaultMessageStore#cleanFilesPeriodically
定时任务定时清理CommitLog和ConsumeQueue,清理ConsumeQueue的同时会一并清理Index文件
- CleanCommitLogService#run
- CleanConsumeQueueService#run
- IndexService#deleteExpiredFiles
源码分析
org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
1
2
3
4
5
6
7
8
9
|
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
...
|
org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
1
2
3
4
5
6
|
private void cleanFilesPeriodically() {
// 删除CommitLog文件
this.cleanCommitLogService.run();
// 删除ConsumeQueue/Index文件
this.cleanConsumeQueueService.run();
}
|
下面看一下删除CommitLog的逻辑,删除ConsumeQueue和删除CommitLog逻辑类似
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#run
1
2
3
4
5
6
7
8
9
10
11
12
|
public void run() {
try {
// 删除过期文件
this.deleteExpiredFiles();
// 如果待删除的正在被读取,第一次和后面的一段时间将拒绝删除
// 一段时间之后挂起的文件将被强制删除
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
|
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
private void deleteExpiredFiles() {
int deleteCount = 0;
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 删除文件间隔,连续删除文件会占用大量的IO会造成抖动,删除一个文件之后休息一会
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 是否是删除过期文件时间,默认凌晨4点,负载比较低
boolean timeup = this.isTimeToDelete();
// 剩余磁盘空间是否足够
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
// 执行删除
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
|
org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isSpaceToDelete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
private boolean isSpaceToDelete() {
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
cleanImmediately = false;
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
// 当前CommitLog目录所在磁盘分区的磁盘使用率
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (physicRatio > diskSpaceWarningLevelRatio) {
// 设置磁盘不可写,会拒绝新消息的写入
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}
// 触发立即删除文件
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
// 不会拒绝新消息的写入
cleanImmediately = true;
} else {
// 清除标记,确保新消息可以写入
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
}
}
....
return false;
}
|