目录

rocketmq源码分析_消息消费总览

前言

消息发送流程

http://img.cana.space/picStore/20201130160007.png

  • 消息发送,存储到CommitLog是顺序写的过程,且要上锁,否则可能存在插队

  • 如果是异步发送,将消息放到pageCache就可以告知producer发送成功了,rocketmq有一个reputMessage的线程,会在while循环里将commitLog中的消息分发给ConsumeQueue、Index文件,采用经典的轮询算法

  • 三个重要的offset

    • Queue Offset,在队列中的序号,0,1,2…
    • Logic Offset, ConsumeQueue中索引数据长度为定长20,所以第一个数据Queue Offset是0,第二个20,第三个40…,可以简单地将Queue Offset 乘以20
    • Physical Offset,CommitLog中消息的全局偏移量
  • ConsumeQueue中的索引数据的数据结构

    • commit log offset 物理偏移
    • size msg长度
    • hashcode of tag msg的tag的hashcode
  • 如何从ConsumeQueue找到CommitLog中的具体消息

    以第5个消息为例,首先通过commit log offset找到消息在commit log的起始偏移,再通过size进行截取就可以获得这条消息了。

生产和消费过程

http://img.cana.space/picStore/20201130161503.png

Producer往broker发消息,写到CommitLog同时异步地dispatch道ConsumeQueue中,Consumer对ConsumeQueue进行消费

dispatch过程

http://img.cana.space/picStore/20201130161700.png

概述

  1. Pull vs Push

  2. Orderly vs Concurrently

    如果是严格顺序,会牺牲很大的可用性,得单队列单线程进行消费

  3. Cluster vs Broadcast

    Cluster 平分

    Broadcast 广播

4-7,如何消费消息

  1. Pull messages

    先将消息从broker拉取到consumer;

  2. Cache messages into TreeMap

    将消息缓存到TreeMap(红黑树,键是顺序的,可以将offset作为key则消费消息时天然有序)中;

  3. Consume messages

    使用消费线程池进行消费(pull和consume是两个过程,但是对使用者是透明的);

  4. Report consumption progress

    上报消费进度给broker(指Cluster模式,Broadcast可以选择将进度放在本地还是放在broker)

http://img.cana.space/picStore/20201130162736.png

Rebalance

每10s进行一次reblance,为每个队列创建一个pullRequestQueue——LinkedBlockingQueue,将PullRequest放到里面供PullMessageService消费,之后每次获取到PullResult都将消息放到本地缓存队列供消费者线程池进行消费

1
2
3
4
5
6
7
8
9
public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            for (MessageExt msg : msgs) {
              // private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);

拉取消息

PullMessageService是一个线程,里面有一个while循环,一直从pullRequestQueue中take PullRequest,如果take到了再做一些流控,流控过了就异步地给broker发送一个PullMessageRequest并提供回调接口,异步调用完成后会有几种情况

  • FOUND,有拉到消息则存到本地缓存,再放到另一个消费线程ConsumeMessageConcurrentlyService中去,之后接着拉(默认每次拉32条),继续调用executePullRequestImmediatelly方法
  • NO_NEW_MSG,如果没拉到或者没匹配的消息仍然循环,但是broker那边会做限制,根据客户端有没有配置长轮询做一个等待,等到了再通知客户端继续拉,本章暂且不讲长轮询
  • NO_MATCHED_MSG

消费消息

先将消息包装成一个消费请求ConsumeRequest,consumeMessageBatchMaxSize表示消息数量,默认1

Reblance

为什么要弄Consumer集群,增大并行度从而提高处理速度,跟flink、spark等流失处理异曲同工达到一个分治的效果。

触发reblance的条件有两个,大部分的情况是Consumer数量变化;topic的queue数量变化也会触发rebalance,本章只会将Consumer数量变化引起的reblance

http://img.cana.space/picStore/20201130164908.png

稳定时再加一个哥们,此时一个queue对应一个Consumer刚刚好

http://img.cana.space/picStore/20201130165025.png

如果再来一个哥们呢

http://img.cana.space/picStore/20201130165056.png

此时第5个Consumer什么也不会做,这也是为什么当队列数量比较少,Consumer比较多依然没有办法充分利用计算资源。

Rebalance Code Time

kafka是通过某一个节点做rebalance,rocketmq是在每一台Consumer上计算的,那么rocketmq是如何做到不混乱的呢?

MQClientInstance会起一个reblance的task,同时对message queue和Consumer list进行排序,这样在做reblance的时候可以将两个排序结果(mqall和cid(当前consumer在consumer list中的序号))作为视图传给reblance方法中进行计算。

http://img.cana.space/picStore/20201130165324.png

Pull Message Request Code time

http://img.cana.space/picStore/20201130181410.png

  • Consumer端PullMessageService线程take到PullRequest调用netty的writeAndFlush,这个方法是异步方法,可以保证整个过程非常轻量,运行完这段代码发送出去请求就结束了。
  • Broker端会收到一个事件触发netty的channelRead()方法,chennelRead方法实现也很轻量,直接将Pull请求丢给业务线程池去处理
  • 业务线程PullMessageThread会获取PullRequest里的内容,去ConsumeQueue中根据逻辑的offset拿到物理偏移和size,这样就可以去CommitLog中将msg截取出来,封装成PullResult。再通过netty的writeAndFlush方法将PullResult返回Consumer给客户端。
  • 同样会将请求丢给另一个业务线程池,也就是callback线程池NettyClientPubilicExecutor
  • 在callback里将msg反序列化后丢到treemap里,再从treemap里取消息丢到一个消费的线程池里进行消费
  • 整个过程很绕,是为了保持netty线程的轻量

源码阅读

入口:org.apache.rocketmq.client.impl.consumer.PullMessageService#run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 从阻塞队列中take PullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            // 拉取请求
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

1
2
3
public void pullMessage(final PullRequest pullRequest) {
  // 创建回调函数
	// 向broker请求拉取数据

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage

1
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

注意:netty的IO线程池不处理业务,将其丢给业务线程池自己实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.apache.rocketmq.remoting.netty;
public abstract class NettyRemotingAbstract {
  /**
   * This container holds all processors per request code, aka, for each incoming request, we may look up the
   * responding processor in this map to handle the request.
   */
  protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
      new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
  
  
  public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
    if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                  // 在这儿将read的请求交给业务线程池去做,pair是一个二元祖,requestCode->Processor,Processor在broker启动的时候进行注册,在BrokerController的registerProcessors()
// this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
  try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);

rocketmq中NettyClientHandler和NettyServerHandler都继承自SimpleChannelInboundHandler<RemotingCommand>

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

broker返回pullResult,消费端进行处理

org.apache.rocketmq.client.consumer.PullCallback#onSuccess

1
2
3
4
5
6
7
8
9
public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
// 做一些前置处理,将msg反序列化,包装成MessageExt
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
	...
    // 发现数据
    将消息数据保存到本地队列进行消费
    pullRequst设置下一个offset继续发请求

成功收到消息后,使用消费者线程池进行消费

1
2
3
4
5
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
      // 将32条消息分成32个ConsumeRequest
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

          // 封装成ConsumeRequest,是一个Runnable,run方法里会获取我们创建的监听器listener进行回调处理
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
              // 将consumeRequest丢给另一个线程池去处理
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

如果消息发送失败,那么会重试,将消息再丢到broker里

1
boolean result = this.sendMessageBack(msg, context);

Consumer是个MQClientInstance,既可以收消息也可以发消息

Pull Message Response

每一个ConsumeQueue顺序读

http://img.cana.space/picStore/20201130194406.png

每一个CommitLog随机读,直接读pageCache

http://img.cana.space/picStore/20201130195405.png

获取Message

org.apache.rocketmq.store.DefaultMessageStore#getMessage

1
2
// 根据物理便宜和消息长度得到消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
    // consume too slow ,redirect to another machine
    if (getMessageResult.isSuggestPullingFromSlave()) {
        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
    }
    // consume ok
    else {
        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
    }
} else {
    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

正常情况下热点一直在pageCache区域,但是如果消费的区域是一块很久之前的CommitLog区域并且不在pageCache里面,则会发生缺页中断,这个时候操作系统又会将这一块commitLog放到pageCache中,这样的话就会导致对这一台机器的内存利用有问题。

RocketMQ也想到了这个问题,解决思路是建议消费端从slave去读。

Message读写

http://img.cana.space/picStore/20201130203333.png

TransientStorePool是一个很好的调优点,但是可能会丢消息,是一个经典的读写分离的实践。使用了TransientStorePool,那么基于TransientStorePool进行写,基于mmap读

长轮询

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
case ResponseCode.PULL_NOT_FOUND:

    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
      	// 由broker进行长轮询,比如等15s再响应,如果15s之内有消息则立即返回,否则等15s后再响应。
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

Consume Message

http://img.cana.space/picStore/20201130204013.png

Consume Message Code time

http://img.cana.space/picStore/20201130204058.png

Push-Consume Code time

http://img.cana.space/picStore/20201130204211.png

rebalance有两种唤醒方式,一个是broker监听到consumer发生变化唤醒rebalance,还有一个是rebalance本事是一个while循环,每隔20s会唤醒一次rebalace。

如果消费太慢,一直拉取会不会导致红黑树撑爆?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}

可以看到rocketmq对此做了限流,如果红黑树里有1000个数据,则直接返回,不会再异步拉取。

总结

rebalance->发送pull message request->回调将结果放到一个红黑树缓存里->另一个线程会对缓存里的结果进行消费。