目录

rocketmq源码分析_消息存储之概要设计

本章内容

  • commitlog文件的目录组织方式、消息基本存储格式
  • consumequeue文件设计的目的、文件组织方式、数据如何组织。
  • index文件设计目的、index底层存储结构设计(Hash索引实现原理)

前言

rocketmq源码分析_消息发送一章中介绍了消息默认是通过轮询方式,基于netty进行发送

http://img.cana.space/picStore/20201128214425.png

那么消息到了broker之后,消息是如何落地的,消费者又是如何消费的呢?

思考题

  1. 当topic数量增多到100+时,kafka的单个broker的TPS降低了1个数量级,而RocketMQ在海量topic的场景下,依然保持较高的TPS?
  2. CommitLog的”随机读”对性能的影响?

消息存储技术选型

  • 分布式KV存储(LevelDB、RocksDB、Redis)

  • NewSQL存储(TiDB)

  • 文件系统(RocketMQ、Kafka、RabbitMQ)

    好的文件系统设计是要比上面两个存储方式性能要高不少,因为可以顺序写。

下面介绍rocketmq是如何通过文件系统来高效地存储消息以及消费消息。

CommitLog

CommitLog介绍

消息落地其实就是将消息存到磁盘的日志文件里,rocketmq中是叫commitLog。

rocketmq与kafka的日志文件对比

http://img.cana.space/picStore/20201128215441.png

kafka是每个topic的queue单独使用一份日志文件,rocketmq是多个queue以共享的形式使用同一份日志文件,每个broker只有一份大的commitlog目录,commitlog文件会被切割成很多小份(每份大小1G)

CommitLog存储形式

http://img.cana.space/picStore/20201128220747.png

通过MappedQueue将多个commitlog文件管理起来,上图中10240byte是本地测试时使用的切割大小,默认是1G。

CommitLog之Message格式

http://img.cana.space/picStore/20201128220858.png

  • QueueOffset 消息的逻辑偏移(在ConsumeQueue上的偏移)
  • PhysicalOffset 消息的物理偏移(在CommitLog上的偏移)
  • properties,rocketmq的message就是靠properties进行扩展的,比如uniqueKey,tag,keys等等属性

RocketMQ存储架构

http://img.cana.space/picStore/20201128215748.png

  1. producer往commitLog中顺序写

  2. 写的时候会异步构建ConsumeQueue逻辑队列

    ComsumeQueue中的数据结构包含3部分

    • CommitLogOffSet 消息的物理位置
    • MessageSize 消息大小
    • TagHashCode Tag的hashcode,用于消息快速过滤

    ComsumeQueue3个重要的offset

    • minOffset 消息如果没有被清理过就是0,如果被清理过那么就顺着以前消费结束的地方开始
    • consumeOffset 消费的进度
    • maxOffset 新消息落地时在ConsumeQueue的offset

RocketMQ存储层次

http://img.cana.space/picStore/20201128221346.png

  • 业务层,网络层
    • SendMessageProcessor,收到消息时分配由哪个processor进行处理
    • QueryMessageProcessor
    • DefaultMessageStore 存储层最核心的一个入口
  • 存储逻辑层
  • 存储I/O层

源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);

this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendProcessor,  this.sendMessageExecutor);
// 多线程处理发送消息
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));

注册Processor,org.apache.rocketmq.broker.BrokerController#registerProcessor

1
2
3
...
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
...

处理流程说明

  • SendMessageProcessor.processRequest

  • DefaultMessageStore.putMessage(MessageExtBrokerInner msg)

  • CommitLog.putMessage(MessageExtBrokerInner msg)

    • 存储到MapedFileQueue的MapedFile

      下面这两个之后章节讲

    • 同步刷盘:GroupCommitService(独立的线程)

    • 异步刷盘:CommitRealTimeService/FlushCommitLogService(独立的线程)

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    // 标识RPC的SeqNumber
    response.setOpaque(request.getOpaque());

    // 埋入的一些可扩展的点,目前开源版本还没有实现
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 可以设置broker启动之后不理解处理发送消息,而是等待一会
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    response.setCode(-1);
    // check消费是否合法
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
        return response;
    }

    final byte[] body = request.getBody();

    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    // 如果没有指定QueueId,系统随机指定一个
    if (queueIdInt < 0) {
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // 构造Broker内部使用的Message
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    // 处理死信队列,如果超过重试次数16次,就新创建一个主题,往这个主题上发消息
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
        return response;
    }

    // 下面构造msgInner的结构
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    // Message的扩展属性字段,比如UniKey,Keys,Tag都在这个里面
    msgInner.setPropertiesString(requestHeader.getProperties());
    // Message在客户端生成的时间
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    // Message的发送方地址
    msgInner.setBornHost(ctx.channel().remoteAddress());
    // 存储该Message的broker地址
    msgInner.setStoreHost(this.getStoreHost());
    // 如果是重试消费的消息,有一个重试消费的次数
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 处理事务消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        // 处理事务消息的prepare消息
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
        // 处理普通的消息和事务消息的Commit/Rollback消息
        // putMessage是整个消息存储的一个入口
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }

    // 返回给客户端
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {

    // Topic的权限,可以禁止读和写,在运维RocketMQServer时非常有用,比如顺序消息的扩容和缩容
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
            + "] sending message is forbidden");
        return response;
    }
    // 不能和系统默认的Topic冲突:AUTO_CREATE_TOPIC_KEY_TOPIC(TBW102)
    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
        log.warn(errorMsg);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorMsg);
        return response;
    }

    // 进去看一下topicConfig的配置
    TopicConfig topicConfig =
        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) {
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            // RETRY_GROUP_TOPIC_PREFIX是消息重试时的Topic
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            } else {
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
            }
        }

        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
        // 进去这个方法看一下
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
            requestHeader.getTopic(),
            requestHeader.getDefaultTopic(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
		...
}

org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageMethod

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
    TopicConfig topicConfig = null;
    boolean createNew = false;

    try {
        if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null)
                    return topicConfig;

                // 如果Topic没有创建,又打开了自动创建Topic的开关,则根据默认Topic的Config来生成新的Config
                TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                if (defaultTopicConfig != null) {
                    if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                            defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                        }
                    }

                    if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                        topicConfig = new TopicConfig(topic);

                        int queueNums =
                            clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
                                .getWriteQueueNums() : clientDefaultTopicQueueNums;

                        if (queueNums < 0) {
                            queueNums = 0;
                        }

                        // 下面主要是拷贝一些参数
                        topicConfig.setReadQueueNums(queueNums);
                        topicConfig.setWriteQueueNums(queueNums);
                        int perm = defaultTopicConfig.getPerm();
                        perm &= ~PermName.PERM_INHERIT;
                        topicConfig.setPerm(perm);
                        topicConfig.setTopicSysFlag(topicSysFlag);
                        topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                    } else {
                        log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                            defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
                    }
                } else {
                    log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
                        defaultTopic, remoteAddress);
                }

                if (topicConfig != null) {
                    log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
                        defaultTopic, topicConfig, remoteAddress);

                    this.topicConfigTable.put(topic, topicConfig);

                    this.dataVersion.nextVersion();

                    createNew = true;

                    this.persist();
                }
            } finally {
                this.lockTopicConfigTable.unlock();
            }
        }
    } catch (InterruptedException e) {
        log.error("createTopicInSendMessageMethod exception", e);
    }

    // 向broker注册这个新创建的topic,并且把topic信息同步给Slave
    if (createNew) {
        this.brokerController.registerBrokerAll(false, true,true);
    }

    return topicConfig;
}

org.apache.rocketmq.store.DefaultMessageStore#putMessage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

    // Slave Broker不处理客户端发送的消息
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

    // Broker也有一些状态可以控制不让写,当Broker不可写时,也不处理客户端发送的消息
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }

    // Topic长度限制
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }

    // Message的扩展属性长度限制
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    // 将Message刷新到OSPageCache,如果耗时比较长的话,broker降级
    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    // 从业务层到逻辑层,PutMessage是咱们存储的核心流程
    long beginTime = this.getSystemClock().now();
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}

核心存储流程:org.apache.rocketmq.store.CommitLog#putMessage

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 继续构造broker内部生成的Message
    // Set the storage time
    // Message存储到broker的时间,这地方有点问题,后面还会再设置一次...
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    // Message body的crc校验码,防止消息内容被篡改和破坏
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    // 存储磁盘IO耗时相关的metric(指标),可以收集这些指标,上报给监控系统
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery 延时消息处理逻辑
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    long eclipseTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    // 获得最近一个CommitLog文件的内存映射文件(零拷贝)
    // 一个MappedFile对应一个CommitLog切割文件,mappedFileQueue来管理这些连续的CommitLog切割文件
    // 可以暂时将MappedFile理解成一个高性能的磁盘I/O接口
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

    // PutMessage会有多个工作者线程在并行处理,需要上锁,可以在broker配置是否可重入的锁还是自旋锁
    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        // 拿到锁之后,再设置一次存储到broker中的时间戳,这样可以做到全局有序
        msg.setStoreTimestamp(beginLockTimestamp);

        // 最近的CommitLog文件写满了,再创建一个新的
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
        }

        // 把Broker内部的这个Message刷新到MappedFile内存,还没有刷盘,进去看一下
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                // CommitLog文件写满,需要将Message写入一个新的CommitLog文件
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
        }

        eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        // 释放锁
        putMessageLock.unlock();
    }

    if (eclipseTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

    // 到这里位置,消息已经刷到pagecache里去了,接下来就是刷盘(同步刷盘或者异步刷盘)
  // Asynchronous flush
        // 异步刷盘有两种选择,一种和同步刷盘一样分成两层(pageCache和磁盘)
        // isTransientStorePoolEnable加了一层缓冲区TransientStorePool,先刷到TransientStorePool再到pageCache,再到磁盘
    handleDiskFlush(result, putMessageResult, msg);
    // 主从同步
    handleHA(result, putMessageResult, msg);

    return putMessageResult;
}

org.apache.rocketmq.store.MappedFile#appendMessagesInner

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    // 当前这个MappedFile写入位置
    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) {
        // 这个Buffer和咱们的同步/异步刷盘相关(异步刷盘还有两种刷盘模式可供选择)
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result = null;
        // 处理单个消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            // 处理批量消息
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        // 刷新写入的位置
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

将消息写入到pagecache:org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    /**
     * wroteOffset 消息的绝对物理位置
     * fileFromOffset: 一个commitLog文件(对应额一个MappedFile)对应的偏移量(文件名就代表这个偏移量)
     * byteBuffer.position(): 当前MappedFile(对应一个CommitLog切割文件)的写位置
     */
    long wroteOffset = fileFromOffset + byteBuffer.position();

    this.resetByteBuffer(hostHolder, 8);
    // 根据broker的存储地址和消息的物理绝对位置生成唯一的MessageId
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    // 表示某个topic的某个queue的消息数量,是为后面创建ConsumeQueue准备的
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    /**
     * 序列化Message的属性和Topic
     * Serialize message
     */
    final byte[] propertiesData =
        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    // 计算Message的长度,即写入commitLog占用的空间,bodyLength、topicLength、propertiesLength这三个参数是可变的
    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // 如果消息太大则拒绝
    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
            + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    /**
     * 确定当前这个CommitLog文件是否有足够的可用空间
     * maxBlank:当前这个CommitLog文件(对应MappedFile)的剩余空间
     * 设计原则:一个Message不能跨越两个CommitLog
     * 每个CommitLog文件要确保预留8个字节来表示这个CommitLog到达文件结尾
     */
    // Determines whether there is sufficient free space
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
        // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE
        // 表示一个CommitLog文件结尾的魔数
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // 消息存储格式
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);

    // 到这里Broker内部构建的Message就准备好了
    // 1 TOTALSIZE 使用4个字节表示消息长度,但会控制在4M,前面有校验
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
    //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // 完成将Message写入MappedFile
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

    // 构建返回的result
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}

ConsumeQueue

消费者是不能直接消费CommitLog的,否则太低效,构建逻辑队列ConsumeQueue,里面存着索引数据,这样消费者只要消费ConsumeQueue,就可以快速定位到具体消息。

http://img.cana.space/picStore/20201128234423.png

ConsumeQueue存储索引数据,ComsumeQueue中的数据结构包含3部分

  • CommitLogOffSet 消息的物理位置
  • MessageSize 消息大小
  • TagHashCode Tag的hashcode,用于消息快速过滤

ConsumeQueue存储示例

http://img.cana.space/picStore/20201128235005.png

一个msg占用20个字节,所以本案例中一个ConsumeQueue文件存13个msg的索引,与CommitLog一样,每个ConsumeQueue对应一个MappedFile,同理由MappedFileQueue管理一个Queue下所有的文件

消费进度文件

Config/consumeOffset.json

http://img.cana.space/picStore/20201128235149.png

消费者订阅文件

Config/subscription.json

主要是用来存储消费者消费时的一些配置

http://img.cana.space/picStore/20201128235312.png

CommitLog和ConsumeQueue存储很类似,可以对比学习,如下:

http://img.cana.space/picStore/20201128235842.png

ConsumeQueue处理流程

ReputMessageService.doReput(独立线程),循环来构建ConsumeQueue

  • CommitLogDispatcherBuildConsumeQueue.dispatch(DispatchRequest request) 构建入口
  • ConsumeQueue.putMessagePositionInfo
    • 构建供消费端使用的逻辑队列数据
  • FlushConsumeQueueService.doFlush()
    • 独立线程异步刷盘,因为构建是放在MappedFile中,所以跟CommitLog一样也需要刷盘

构建ConsumeQueue源码分析

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
/**
 * 主要是构建ConsumeQueue和Index
 * reputFromOffset:之前构建ConsumeQueue/Index的进度
 * result存储的是已经刷过盘的CommitLog的进度-reputFromOffset,就是可以构建ConsumeQueue、Index的Message
 */
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
    try {
        this.reputFromOffset = result.getStartOffset();

        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
            // 读取一个完整的Message(commitLog中未构建的第一个消息),返回DispatchRequest对象
            DispatchRequest dispatchRequest =
                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    // 执行构建
                    DefaultMessageStore.this.doDispatch(dispatchRequest);
...

org.apache.rocketmq.store.DefaultMessageStore#doDispatch

1
2
3
4
5
6
public void doDispatch(DispatchRequest req) {
    // dispatcherList,分别是CommitLogDispatcherBuildConsumeQueue构建器和CommitLogDispatcherBuildIndex构建器
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
 * @param offset 需要重构ConsumeQueue的Message的CommitLog的物理位置
 * @param size Message大小
 * @param tagsCode Message的TagCode
 * @param cqOffset 消息队列的逻辑偏移
 * @return
 */
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }

    // nio写入三个参数
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);

    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    // 根据期望的逻辑偏移位置找到对应某个ConsumeQueue文件的MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        /**
         * 如果mappedFileQueue的MappedFile List被清除
         * 需要保证消息队列的逻辑位置和ConsumeQueue文件的起始文件和偏移保持一致,要补充空的逻辑消息
         */
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
          // 重要:当前逻辑偏移是根据文件起始偏移+mappedFile写的偏移计算出来的,此时消息还没从内存刷到磁盘,如果是异步刷盘,broker断电就会存在数据丢失的情况,此时消费者消费不到,所以在重要业务中使用同步刷盘确保数据不丢失
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

            if (expectLogicOffset < currentLogicOffset) {
                log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            }

            if (expectLogicOffset != currentLogicOffset) {
                LOG_ERROR.warn(
                    "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                    expectLogicOffset,
                    currentLogicOffset,
                    this.topic,
                    this.queueId,
                    expectLogicOffset - currentLogicOffset
                );
            }
        }
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

Index

http://img.cana.space/picStore/20201129003432.png

按照时间来命名

http://img.cana.space/picStore/20201129003450.png

KeyHash:索引key的hash,index指向链表的下一个节点

源码分析

索引构建流程

ReputMessageService.doReput(独立线程),和ConsumeQueue使用同一个线程处理,但是使用独立的dispatcher

  • CommitLogDispatcherBuildIndex.dispatch(DispatchRequest request)

  • IndexService.buildIndex(DispatchRequest req)

    构建供消息查询系统使用的消息索引数据

org.apache.rocketmq.store.index.IndexService#buildIndex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void buildIndex(DispatchRequest req) {
    // IndexFile对应一个MappedFile
    IndexFile indexFile = retryGetAndCreateIndexFile();
    if (indexFile != null) {
        long endPhyOffset = indexFile.getEndPhyOffset();
        DispatchRequest msg = req;
        String topic = msg.getTopic();
        // keys来源于Message的Properties
        String keys = msg.getKeys();
        // 已经构建过索引则直接返回
        if (msg.getCommitLogOffset() < endPhyOffset) {
            return;
        }

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                return;
        }

        /**
         * Index会构建两种索引,分别是对UniqKey和keys
         */

        if (req.getUniqKey() != null) {
            // UniqKey是客户端生成的MessageId,也来源于Message的properties
            indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
            if (indexFile == null) {
                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                return;
            }
        }

        // 对业务端传的keys构建索引
        if (keys != null && keys.length() > 0) {
            String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
            for (int i = 0; i < keyset.length; i++) {
                String key = keyset[i];
                if (key.length() > 0) {
                    indexFile = putKey(indexFile, msg, buildKey(topic, key));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
            }
        }
    } else {
        log.error("build index error, stop building index");
    }
}

org.apache.rocketmq.store.index.IndexFile#putKey

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    // IndexFile还没有写满
    if (this.indexHeader.getIndexCount() < this.indexNum) {

        // 根据topic-key或者topic-uniqKey计算Hash值
        int keyHash = indexKeyHashMethod(key);
        // Hash桶的位置
        int slotPos = keyHash % this.hashSlotNum;
        // 计算在内存中的位置
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            // 该Hash桶上是否已经有了数据,如果有数据的话,需要记录下来,为后面构建LinkList做准备
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }

            // 落地的时间-当前索引文件的起始时间,提高查询效率
            // timeDiff
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }

            // 计算索引数据需要放在哪个位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;

            // 存放根据topic-keys或者topic-uniquKey计算的hash值
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            // Message在CommitLog的物理位置
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            // 落地时间-当前索引文件的起始时间
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            // 在索引数据域要把刚才有冲突的Hash桶的位置记录下来,这样就构建成了一个LinkList
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
            // 更新Hash桶的索引位置,如果有冲突,刚才我们已经记录下来
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            // 更新indexHeader信息
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    return false;
}

消息查询

两个概念

  • broker生成的消息ID

  • 客户端SDK生成的UniqueKey

如何查询消息

按照MessageID查询(依赖CommitLog)

  • MQAdminImpl.viewMessage(String MessageId)

    messageId存储这broker地址信息和位置信息,所以可以解析成下面这个方法

    • MQAdminImpl.viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
      • QueryMessageProcessor.viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)

按照uniqueKey/Keys查询(依赖Index和CommitLog)

MQAdminImpl.queryMessage(String topic, String key, int maxNum, long begin, long end,boolean isUniqKey)

MQAdminImpl.queryMessage(final String addr,final QueryMessageRequestHeader requestHeader,final long timeoutMillis,final InvokeCallback invokeCallback,final Boolean isUnqiueKey)

QueryMessageProcessor.queryMessage(ChannelHandlerContext ctx, RemotingCommand request)

其他文件

lock

DefaultMessageStore.start()

有时候一台机器上会起多个broker,如果数据文件放在一个目录,这时候可以通过锁来提示你使用另一个目录,防止冲突。

abort

getAbortFile

如果broker安全退出的话是没有这个文件的,如果不是安全退出的话会产生这个文件,下次启动的时候会根据这个文件做一些恢复的操作。恢复根据下面的checkpoint来进行。

checkpoint

StoreCheckpoint,定期更新根据commitLog构建ComsumeQueue和Index的时间点,超过这个时间点以后的,下次启动时从这个时间点再进行构建。