前言
消息发送流程

-
消息发送,存储到CommitLog是顺序写的过程,且要上锁,否则可能存在插队
-
如果是异步发送,将消息放到pageCache就可以告知producer发送成功了,rocketmq有一个reputMessage的线程,会在while循环里将commitLog中的消息分发给ConsumeQueue、Index文件,采用经典的轮询算法
-
三个重要的offset
- Queue Offset,在队列中的序号,0,1,2…
- Logic Offset, ConsumeQueue中索引数据长度为定长20,所以第一个数据Queue Offset是0,第二个20,第三个40…,可以简单地将Queue Offset 乘以20
- Physical Offset,CommitLog中消息的全局偏移量
-
ConsumeQueue中的索引数据的数据结构
- commit log offset 物理偏移
- size msg长度
- hashcode of tag msg的tag的hashcode
-
如何从ConsumeQueue找到CommitLog中的具体消息
以第5个消息为例,首先通过commit log offset找到消息在commit log的起始偏移,再通过size进行截取就可以获得这条消息了。
生产和消费过程

Producer往broker发消息,写到CommitLog同时异步地dispatch道ConsumeQueue中,Consumer对ConsumeQueue进行消费
dispatch过程

概述
-
Pull vs Push
-
Orderly vs Concurrently
如果是严格顺序,会牺牲很大的可用性,得单队列单线程进行消费
-
Cluster vs Broadcast
Cluster 平分
Broadcast 广播
4-7,如何消费消息
-
Pull messages
先将消息从broker拉取到consumer;
-
Cache messages into TreeMap
将消息缓存到TreeMap(红黑树,键是顺序的,可以将offset作为key则消费消息时天然有序)中;
-
Consume messages
使用消费线程池进行消费(pull和consume是两个过程,但是对使用者是透明的);
-
Report consumption progress
上报消费进度给broker(指Cluster模式,Broadcast可以选择将进度放在本地还是放在broker)

Rebalance
每10s进行一次reblance,为每个队列创建一个pullRequestQueue——LinkedBlockingQueue,将PullRequest放到里面供PullMessageService消费,之后每次获取到PullResult都将消息放到本地缓存队列供消费者线程池进行消费
1
2
3
4
5
6
7
8
9
|
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
// private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
|
拉取消息
PullMessageService是一个线程,里面有一个while循环,一直从pullRequestQueue中take PullRequest,如果take到了再做一些流控,流控过了就异步地给broker发送一个PullMessageRequest并提供回调接口,异步调用完成后会有几种情况
- FOUND,有拉到消息则存到本地缓存,再放到另一个消费线程ConsumeMessageConcurrentlyService中去,之后接着拉(默认每次拉32条),继续调用executePullRequestImmediatelly方法
- NO_NEW_MSG,如果没拉到或者没匹配的消息仍然循环,但是broker那边会做限制,根据客户端有没有配置长轮询做一个等待,等到了再通知客户端继续拉,本章暂且不讲长轮询
- NO_MATCHED_MSG
消费消息
先将消息包装成一个消费请求ConsumeRequest,consumeMessageBatchMaxSize表示消息数量,默认1
Reblance
为什么要弄Consumer集群,增大并行度从而提高处理速度,跟flink、spark等流失处理异曲同工达到一个分治的效果。
触发reblance的条件有两个,大部分的情况是Consumer数量变化;topic的queue数量变化也会触发rebalance,本章只会将Consumer数量变化引起的reblance

稳定时再加一个哥们,此时一个queue对应一个Consumer刚刚好

如果再来一个哥们呢

此时第5个Consumer什么也不会做,这也是为什么当队列数量比较少,Consumer比较多依然没有办法充分利用计算资源。
Rebalance Code Time
kafka是通过某一个节点做rebalance,rocketmq是在每一台Consumer上计算的,那么rocketmq是如何做到不混乱的呢?
MQClientInstance会起一个reblance的task,同时对message queue和Consumer list进行排序,这样在做reblance的时候可以将两个排序结果(mqall和cid(当前consumer在consumer list中的序号))作为视图传给reblance方法中进行计算。

Pull Message Request Code time

- Consumer端PullMessageService线程take到PullRequest调用netty的writeAndFlush,这个方法是异步方法,可以保证整个过程非常轻量,运行完这段代码发送出去请求就结束了。
- Broker端会收到一个事件触发netty的channelRead()方法,chennelRead方法实现也很轻量,直接将Pull请求丢给业务线程池去处理
- 业务线程PullMessageThread会获取PullRequest里的内容,去ConsumeQueue中根据逻辑的offset拿到物理偏移和size,这样就可以去CommitLog中将msg截取出来,封装成PullResult。再通过netty的writeAndFlush方法将PullResult返回Consumer给客户端。
- 同样会将请求丢给另一个业务线程池,也就是callback线程池NettyClientPubilicExecutor
- 在callback里将msg反序列化后丢到treemap里,再从treemap里取消息丢到一个消费的线程池里进行消费
- 整个过程很绕,是为了保持netty线程的轻量
源码阅读
入口:org.apache.rocketmq.client.impl.consumer.PullMessageService#run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 从阻塞队列中take PullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
// 拉取请求
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
|
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
1
2
3
|
public void pullMessage(final PullRequest pullRequest) {
// 创建回调函数
// 向broker请求拉取数据
|
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessage
1
|
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
|
注意:netty的IO线程池不处理业务,将其丢给业务线程池自己实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package org.apache.rocketmq.remoting.netty;
public abstract class NettyRemotingAbstract {
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
// 在这儿将read的请求交给业务线程池去做,pair是一个二元祖,requestCode->Processor,Processor在broker启动的时候进行注册,在BrokerController的registerProcessors()
// this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
|
rocketmq中NettyClientHandler和NettyServerHandler都继承自SimpleChannelInboundHandler<RemotingCommand>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
|
broker返回pullResult,消费端进行处理
org.apache.rocketmq.client.consumer.PullCallback#onSuccess
1
2
3
4
5
6
7
8
9
|
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 做一些前置处理,将msg反序列化,包装成MessageExt
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
...
// 发现数据
将消息数据保存到本地队列,进行消费
pullRequst设置下一个offset,继续发请求
|
成功收到消息后,使用消费者线程池进行消费
1
2
3
4
5
|
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
|
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 将32条消息分成32个ConsumeRequest
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
// 封装成ConsumeRequest,是一个Runnable,run方法里会获取我们创建的监听器listener进行回调处理
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
// 将consumeRequest丢给另一个线程池去处理
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
|
如果消息发送失败,那么会重试,将消息再丢到broker里
1
|
boolean result = this.sendMessageBack(msg, context);
|
Consumer是个MQClientInstance,既可以收消息也可以发消息
Pull Message Response
每一个ConsumeQueue顺序读

每一个CommitLog随机读,直接读pageCache

获取Message
org.apache.rocketmq.store.DefaultMessageStore#getMessage
1
2
|
// 根据物理便宜和消息长度得到消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
|
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
1
2
3
4
5
6
7
8
9
10
11
12
|
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
|
正常情况下热点一直在pageCache区域,但是如果消费的区域是一块很久之前的CommitLog区域并且不在pageCache里面,则会发生缺页中断,这个时候操作系统又会将这一块commitLog放到pageCache中,这样的话就会导致对这一台机器的内存利用有问题。
RocketMQ也想到了这个问题,解决思路是建议消费端从slave去读。
Message读写

TransientStorePool是一个很好的调优点,但是可能会丢消息,是一个经典的读写分离的实践。使用了TransientStorePool,那么基于TransientStorePool进行写,基于mmap读
长轮询
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 由broker进行长轮询,比如等15s再响应,如果15s之内有消息则立即返回,否则等15s后再响应。
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
|
Consume Message

Consume Message Code time

Push-Consume Code time

rebalance有两种唤醒方式,一个是broker监听到consumer发生变化唤醒rebalance,还有一个是rebalance本事是一个while循环,每隔20s会唤醒一次rebalace。
如果消费太慢,一直拉取会不会导致红黑树撑爆?
1
2
3
4
5
6
7
8
9
10
11
12
|
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
|
可以看到rocketmq对此做了限流,如果红黑树里有1000个数据,则直接返回,不会再异步拉取。
总结
rebalance->发送pull message request->回调将结果放到一个红黑树缓存里->另一个线程会对缓存里的结果进行消费。