目录

rocketmq源码分析_消息存储之刷盘机制

前言

在前面的章节中学习了消息是如何从producer发送,broker如何将消息放入内存,本章主要学习broker如何将消息从内存刷到磁盘。

思考题:

  • 同步刷盘和异步刷盘的主要区别是什么?
  • 同步刷盘和异步刷盘的使用场景?

同步刷盘和异步刷盘

http://img.cana.space/picStore/20201129152451.png

同步刷盘

刷盘流程

20201129152602

如果每来一条消息就走灰色区域流程,难么肯定会很低效,rocketmq作为高性能的消息中间件,走的是右边逻辑

源码分析

入口

org.apache.rocketmq.store.CommitLog#putMessage

1
2
// 到这里位置,消息已经刷到pagecache里去了,接下来就是刷盘(同步刷盘或者异步刷盘)
handleDiskFlush(result, putMessageResult, msg);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 同步刷盘
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        // 如果客户端要确定刷盘是否成功
        if (messageExt.isWaitStoreMsgOK()) {
            // nextOffset:当前内存的写的位置+本次要写入的字节数
            // 进去看下GroupCommitRequest结构体
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            // 等待MappedFile刷盘成功
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            // Broker虽然配置成同步刷盘,但客户端不关心是否已经刷盘成功,则同步刷盘策略退化成异步刷盘策略
            service.wakeup();
        }
    }

GroupCommitRequest

1
2
3
4
5
6
7
public static class GroupCommitRequest {
    // MappedFile文件缓冲区的位置
    private final long nextOffset;
    // 等待刷盘是否完成的标志
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
    // 刷盘是否成功
    private volatile boolean flushOK = false;

org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        // 将请求写到队列
        this.requestsWrite.add(request);
    }
    if (hasNotified.compareAndSet(false, true)) {
        // 如果通知已经传达了,但工作线程还没有处理结束,新的请求就不用通知了
        // 如果工作线程在等待,就提前唤醒工作线程
        waitPoint.countDown(); // notify
    }
}

等待刷盘线程刷盘完通知当前线程继续后面的流程

1
2
// 等待MappedFile刷盘成功
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

查看刷盘线程运行流程

org.apache.rocketmq.store.CommitLog.GroupCommitService#run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 等待通知,如果有数据过来可以提前结束等待
            this.waitForRunning(10);
            // 执行刷盘
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void doCommit() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            // 读队列
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                    // CommitLog.this.mappedFileQueue.getFlushedWhere():CommitLog已经落地的位置
                    // req.getNextOffset():MappedFile内存缓冲区的位置
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    // 为啥循环两次刷盘
                    // 最多只能把当前没有刷盘的MappedFile全部刷盘
                    // 如果在这个处理周期有新的MappedFile产生,新的MappedFile的刷盘再触发一次
                    if (!flushOK) {
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
                // 唤醒等待刷盘结果的线程
                req.wakeupCustomer(flushOK);
            }

            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            // 更新checkpoint文件的CommitLog最后落地时间
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }

            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

org.apache.rocketmq.store.MappedFileQueue#flush

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public boolean flush(final int flushLeastPages) {
    boolean result = true;
    // 根据最近的刷盘位置找到mappedFile
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        int offset = mappedFile.flush(flushLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        // 刷新了新数据返回false,否则返回true
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}

调用nio方法将数据落地

org.apache.rocketmq.store.MappedFile#flush

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * flushLeastPages累计到多大才刷盘,参数可配置
 * @return The current flushed position
 */
public int flush(final int flushLeastPages) {
    // 如果累积的要刷盘的消息大小大于flushLeastPages才需要刷盘,否则之后一起刷
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();

            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    // 到这里才真正地落地
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }

            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

异步刷盘

刷盘流程

http://img.cana.space/picStore/20201129160256.png

源码分析

程序入口

org.apache.rocketmq.store.CommitLog#handleDiskFlush

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Asynchronous flush
// 异步刷盘有两种选择,一种和同步刷盘一样分成两层(pageCache和磁盘)
// isTransientStorePoolEnable加了一层缓冲区TransientStorePool,先刷到TransientStorePool再到pageCache,再到磁盘
else {
    if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
      // 没有启动
        flushCommitLogService.wakeup();
    } else {
        commitLogService.wakeup();
    }
}

没有启动堆外内存时的异步刷盘流程

org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

                // 定时刷盘的开关
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                // 定时刷盘的最小页面数,如果需要强制刷盘则将其调整为0
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                // 每次刷盘的最少页面
                int flushPhysicQueueThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
                // 即使没有达到刷盘的页面要求,超出一定时间,也需要刷盘
                boolean printFlushProgress = false;

                // Print flush progress
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis;
                    flushPhysicQueueLeastPages = 0;
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        // 有数据要落地的通知过来,提前结束等待
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    // 真正去刷盘
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        // 更新checkpoint的CommitLog最后落地时间
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // 安全退出时要刷盘
            // Normal shutdown, to ensure that all the flush before exit
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                // 如果有数据有更新,则循环刷新,知道没有数据要更新
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }

            this.printFlushProgress();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

启动堆外内存时的异步刷盘流程

org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class CommitRealTimeService extends FlushCommitLogService {
...
public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                // Commit间隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
                // 每次Commit最少页面
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
                // 即使没有达到Commit的最少页面要求,超出一定时间也需要commit
                int commitDataThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

                long begin = System.currentTimeMillis();
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin;
                    commitDataLeastPages = 0;
                }

                try {
                    // 把堆外内存的数据commit到虚拟内存中去,最终调的fileChannel.write
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    long end = System.currentTimeMillis();
                    if (!result) {
                        // 有数据commit到FileChannel
                        this.lastCommitTimestamp = end; // result = false means some data committed.
                        //now wake up flush thread.
                        // 唤醒flushCommitLogService线程
                        flushCommitLogService.wakeup();
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }

小结

http://img.cana.space/picStore/20201129163431.png

ServiceThread#waitForRunning

很基础的方法,可以在Run函数里控制循环的节奏(支持通过wakeup唤醒)

使用到的地方:

http://img.cana.space/picStore/20201129163820.png

源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 外部的线程唤醒本线程
    public void wakeup() {
        if (hasNotified.compareAndSet(false, true)) {
            // 避免多次调用
            waitPoint.countDown(); // notify
        }
    }

    // 在Run函数里控制循环的节奏(支持通过wakeup唤醒)
    protected void waitForRunning(long interval) {
				// 已经通知过了,说明有数据可以处理了,提前结束等待
        if (hasNotified.compareAndSet(true, false)) {
          	// 空方法,供业务方扩展
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

    protected void onWaitEnd() {
    }

CountDownLatch2

多了个reset方法,可以将计数器进行复位。

同步刷盘和异步刷盘特点

同步刷盘

  • 数据可靠性高
  • 同步刷盘性能比异步刷盘性能要低,官方说差10%左右
  • 适用于金融等对数据可靠性要求高的场景

异步刷盘

  • Broker的性能和吞吐量高
  • 客户端延时低
  • Broker异常关闭时,有少量消息丢失

为什么出现 TransientStorePool

rocketmq对消息读写进行的优化,下面是来自社区胡宗棠老师的总结

一般有两种方式进行读写

第一种,Mmap+PageCache的方式

读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。

第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式

这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。