rocketmq源码分析_消息重试及过滤
目录
消息重试
FAQ
What is going to happen when consumer listener returns “RECONSUME_LATER”?
消息重试流程
Prepare
启动一个Consumer,订阅某个topic,这个订阅关系会通过心跳传到broker;Consumer会将订阅的这个topic复制一份,不仅订阅当前这个topic还会订阅一个叫做“%RETRY%ConsumerGroup”形式的topic,之后就可以正常的接受消息。
Pull Message
rebalance->发送pull message request->回调将结果放到一个红黑树缓存里->另一个线程会对缓存里的结果进行消费。
Backup
将topic放到一些变量中
通过offset找到原来的消息,根据原来的消息创建有延时的重试消息,注意重试放回去的消息已经不是原来接收的那个消息。
Persistence
将新创建的消息持久化到commitLog中,reput线程监听到最大offset变化会将消息加入到consumeQueue中
延时等级DelayLv
在构建正常业务订阅关系时一并构建重试订阅关系
重试次数由broker端进行+1
将新创建的重试消息投递到延时队列中,默认初始值3
tagsCode设置成将要执行的时间点
到了时间点之后会清除之前设置的延时等级,并且将topic改成真实的重试topic也就是“%RETRY%ConsumerGroup”,重新put这个message(持久化到commitlog,指派到consumequeue中),这样Consumer端基于原先的订阅关系就能拉取到这个重试消息。
消息过滤
订阅关系,tags放到一个set中
是否包含指定tag方法
拉取消息时进行过滤