前言
在前面的章节中学习了消息是如何从producer发送,broker如何将消息放入内存,本章主要学习broker如何将消息从内存刷到磁盘。
思考题:
- 同步刷盘和异步刷盘的主要区别是什么?
- 同步刷盘和异步刷盘的使用场景?
同步刷盘和异步刷盘

同步刷盘
刷盘流程

如果每来一条消息就走灰色区域流程,难么肯定会很低效,rocketmq作为高性能的消息中间件,走的是右边逻辑
源码分析
入口
org.apache.rocketmq.store.CommitLog#putMessage
1
2
|
// 到这里位置,消息已经刷到pagecache里去了,接下来就是刷盘(同步刷盘或者异步刷盘)
handleDiskFlush(result, putMessageResult, msg);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 如果客户端要确定刷盘是否成功
if (messageExt.isWaitStoreMsgOK()) {
// nextOffset:当前内存的写的位置+本次要写入的字节数
// 进去看下GroupCommitRequest结构体
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
// 等待MappedFile刷盘成功
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
// Broker虽然配置成同步刷盘,但客户端不关心是否已经刷盘成功,则同步刷盘策略退化成异步刷盘策略
service.wakeup();
}
}
|
GroupCommitRequest
1
2
3
4
5
6
7
|
public static class GroupCommitRequest {
// MappedFile文件缓冲区的位置
private final long nextOffset;
// 等待刷盘是否完成的标志
private final CountDownLatch countDownLatch = new CountDownLatch(1);
// 刷盘是否成功
private volatile boolean flushOK = false;
|
org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest
1
2
3
4
5
6
7
8
9
10
11
|
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
// 将请求写到队列
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
// 如果通知已经传达了,但工作线程还没有处理结束,新的请求就不用通知了
// 如果工作线程在等待,就提前唤醒工作线程
waitPoint.countDown(); // notify
}
}
|
等待刷盘线程刷盘完通知当前线程继续后面的流程
1
2
|
// 等待MappedFile刷盘成功
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
|
查看刷盘线程运行流程
org.apache.rocketmq.store.CommitLog.GroupCommitService#run
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 等待通知,如果有数据过来可以提前结束等待
this.waitForRunning(10);
// 执行刷盘
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
|
org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
// 读队列
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
// CommitLog.this.mappedFileQueue.getFlushedWhere():CommitLog已经落地的位置
// req.getNextOffset():MappedFile内存缓冲区的位置
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
// 为啥循环两次刷盘
// 最多只能把当前没有刷盘的MappedFile全部刷盘
// 如果在这个处理周期有新的MappedFile产生,新的MappedFile的刷盘再触发一次
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
// 唤醒等待刷盘结果的线程
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
// 更新checkpoint文件的CommitLog最后落地时间
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
|
org.apache.rocketmq.store.MappedFileQueue#flush
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 根据最近的刷盘位置找到mappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
// 刷新了新数据返回false,否则返回true
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
|
调用nio方法将数据落地
org.apache.rocketmq.store.MappedFile#flush
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
/**
* flushLeastPages累计到多大才刷盘,参数可配置
* @return The current flushed position
*/
public int flush(final int flushLeastPages) {
// 如果累积的要刷盘的消息大小大于flushLeastPages才需要刷盘,否则之后一起刷
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
// 到这里才真正地落地
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
|
异步刷盘
刷盘流程

源码分析
程序入口
org.apache.rocketmq.store.CommitLog#handleDiskFlush
1
2
3
4
5
6
7
8
9
10
11
|
// Asynchronous flush
// 异步刷盘有两种选择,一种和同步刷盘一样分成两层(pageCache和磁盘)
// isTransientStorePoolEnable加了一层缓冲区TransientStorePool,先刷到TransientStorePool再到pageCache,再到磁盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 没有启动
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
|
没有启动堆外内存时的异步刷盘流程
org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 定时刷盘的开关
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 定时刷盘的最小页面数,如果需要强制刷盘则将其调整为0
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 每次刷盘的最少页面
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
// 即使没有达到刷盘的页面要求,超出一定时间,也需要刷盘
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
// 有数据要落地的通知过来,提前结束等待
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
// 真正去刷盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 更新checkpoint的CommitLog最后落地时间
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// 安全退出时要刷盘
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
// 如果有数据有更新,则循环刷新,知道没有数据要更新
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
|
启动堆外内存时的异步刷盘流程
org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
class CommitRealTimeService extends FlushCommitLogService {
...
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// Commit间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// 每次Commit最少页面
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// 即使没有达到Commit的最少页面要求,超出一定时间也需要commit
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
// 把堆外内存的数据commit到虚拟内存中去,最终调的fileChannel.write
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
// 有数据commit到FileChannel
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
// 唤醒flushCommitLogService线程
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
|
小结

ServiceThread#waitForRunning
很基础的方法,可以在Run函数里控制循环的节奏(支持通过wakeup唤醒)
使用到的地方:

源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// 外部的线程唤醒本线程
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
// 避免多次调用
waitPoint.countDown(); // notify
}
}
// 在Run函数里控制循环的节奏(支持通过wakeup唤醒)
protected void waitForRunning(long interval) {
// 已经通知过了,说明有数据可以处理了,提前结束等待
if (hasNotified.compareAndSet(true, false)) {
// 空方法,供业务方扩展
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
protected void onWaitEnd() {
}
|
CountDownLatch2
多了个reset方法,可以将计数器进行复位。
同步刷盘和异步刷盘特点
同步刷盘
- 数据可靠性高
- 同步刷盘性能比异步刷盘性能要低,官方说差10%左右
- 适用于金融等对数据可靠性要求高的场景
异步刷盘
- Broker的性能和吞吐量高
- 客户端延时低
- Broker异常关闭时,有少量消息丢失
为什么出现 TransientStorePool
rocketmq对消息读写进行的优化,下面是来自社区胡宗棠老师的总结
一般有两种方式进行读写
第一种,Mmap+PageCache的方式
读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。
第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式
这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。