目录

rocketmq源码分析_消息重试及过滤

消息重试

FAQ

What is going to happen when consumer listener returns “RECONSUME_LATER”?

消息重试流程

http://img.cana.space/picStore/20201211130538.png

Prepare

启动一个Consumer,订阅某个topic,这个订阅关系会通过心跳传到broker;Consumer会将订阅的这个topic复制一份,不仅订阅当前这个topic还会订阅一个叫做“%RETRY%ConsumerGroup”形式的topic,之后就可以正常的接受消息。

Pull Message

rebalance->发送pull message request->回调将结果放到一个红黑树缓存里->另一个线程会对缓存里的结果进行消费。

Backup

将topic放到一些变量中

通过offset找到原来的消息,根据原来的消息创建有延时的重试消息,注意重试放回去的消息已经不是原来接收的那个消息。

Persistence

将新创建的消息持久化到commitLog中,reput线程监听到最大offset变化会将消息加入到consumeQueue中

延时等级DelayLv

http://img.cana.space/picStore/20201212133934.png

在构建正常业务订阅关系时一并构建重试订阅关系

http://img.cana.space/picStore/20201212140713.png

重试次数由broker端进行+1

http://img.cana.space/picStore/20201212140821.png

将新创建的重试消息投递到延时队列中,默认初始值3

http://img.cana.space/picStore/20201212140956.png

http://img.cana.space/picStore/20201212141047.png

tagsCode设置成将要执行的时间点

http://img.cana.space/picStore/20201212141139.png

到了时间点之后会清除之前设置的延时等级,并且将topic改成真实的重试topic也就是“%RETRY%ConsumerGroup”,重新put这个message(持久化到commitlog,指派到consumequeue中),这样Consumer端基于原先的订阅关系就能拉取到这个重试消息。

http://img.cana.space/picStore/20201212142104.png

消息过滤

http://img.cana.space/picStore/20201212140317.png

订阅关系,tags放到一个set中

http://img.cana.space/picStore/20201212140421.png

是否包含指定tag方法

http://img.cana.space/picStore/20201212140531.png

拉取消息时进行过滤

http://img.cana.space/picStore/20201212140556.png

http://img.cana.space/picStore/20201212140608.png