目录

rocketmq消息发送样例

开启远程调试

一、首先下载rocketmq服务端版本对应源码,例如我的是4.4.0

官方源码基本没注释,在github上找了一篇带注释的,https://github.com/mayday05/rocketmq

clone到本地,注意版本要一致git@github.com:mayday05/rocketmq.git

1
$ git clone git@github.com:JiangJibo/rocketmq.git rmq_note

二、在rocketmq集群搭建里已经开启了远程调试口子,并且做了端口映射,如下:

1
JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"

参数说明:

1
2
3
4
5
6
-Xdebug是通知JVM工作在DEBUG模式下
-Xrunjdwp是通知JVM使用(java debug wire protocol)来运行调试环境。该参数同时了一系列的调试选项:
transport指定了调试数据的传送方式,dt_socket是指用SOCKET模式,另有dt_shmem指用共享内存方式,其中,dt_shmem只适用于Windows平台。
address指定了调试端口
server参数是指是否支持在server模式的VM中.
suspend指明,是否在调试客户端建立起来后,再执行JVM

接下来只需要在idea中添加远程调试配置即可,如下:

http://img.cana.space/picStore/20201126142747.png

打断点,启动debug模式,控制台显示如下表示成功

http://img.cana.space/picStore/20201126142858.png

基本样例

导入MQ客户端依赖

1
2
3
4
5
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

消息发送者步骤分析

  1. 创建消息生产者producer,并制定生产者组名
  2. 指定Nameserver地址
  3. 启动producer
  4. 创建消息对象,指定主题Topic、Tag和消息体
  5. 发送消息
  6. 关闭生产者producer

消息消费者步骤分析

  1. 创建消费者Consumer,制定消费者组名
  2. 指定Nameserver地址
  3. 订阅主题Topic和Tag
  4. 设置回调函数,处理消息
  5. 启动消费者consumer

消息发送

发送同步消息

发送消息等待broker返回发送结果,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package space.cana.rmq.demo.base.producer;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

/**
 * 发送同步消息
 */
@Slf4j
public class SyncProducer {
    @SneakyThrows
    public static void main(String[] args) {
        // 1. 创建消息生产者,并指定生产者group
        DefaultMQProducer producer = new DefaultMQProducer("cana-producer-group");
        // 2. 指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        // 3. 启动producer
        producer.start();
        // 4. 创建消息对象,指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("cana-topic", "sync-tag", ("hello sync msg" + i).getBytes());
            // 发送消息
            SendResult sendResult = producer.send(msg);
            // 发送状态
            SendStatus sendStatus = sendResult.getSendStatus();
            // 消息id
            String msgId = sendResult.getMsgId();
            // 消息接受队列id
            int queueId = sendResult.getMessageQueue().getQueueId();
            log.info("发送结果:{}", sendResult);
        }
        // 6. 关闭生产者producer
        producer.shutdown();

    }
}

运行结果:

http://img.cana.space/picStore/20201126154655.png

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建消息生产者,并指定生产者group
        DefaultMQProducer producer = new DefaultMQProducer("cana-producer-group");
        // 2. 指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        // 3. 启动producer
        producer.start();

        // 4. 创建消息对象,指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("cana-topic", "async-tag", ("hello async msg" + i).getBytes());
            // 发送异步消息
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("发送成功,结果:{}", sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    log.info("发送异常", e);
                }
            });
        }
        // 这里不能直接shutdown(), 异步 的方式 直接就关闭了
//        producer.shutdown();
    }
}

运行结果:

1
2
3
4
2020-11-26 16:25:35 INFO  [NettyClientPublicExecutor_1] space.cana.rmq.demo.base.producer.AsyncProducer - 发送成功,结果SendResult [sendStatus=SEND_OK, msgId=C0A8010410247F31245A844555710000, offsetMsgId=C0A8010400002E8700000000000002EC, messageQueue=MessageQueue [topic=cana-topic, brokerName=broker-2, queueId=3], queueOffset=0]
2020-11-26 16:25:35 INFO  [NettyClientPublicExecutor_2] space.cana.rmq.demo.base.producer.AsyncProducer - 发送成功,结果SendResult [sendStatus=SEND_OK, msgId=C0A8010410247F31245A844555820001, offsetMsgId=C0A8010400002A9F0000000000000462, messageQueue=MessageQueue [topic=cana-topic, brokerName=broker-1, queueId=3], queueOffset=0]
2020-11-26 16:25:35 INFO  [NettyClientPublicExecutor_3] space.cana.rmq.demo.base.producer.AsyncProducer - 发送成功,结果SendResult [sendStatus=SEND_OK, msgId=C0A8010410247F31245A844555890006, offsetMsgId=C0A8010400002E8700000000000003A4, messageQueue=MessageQueue [topic=cana-topic, brokerName=broker-2, queueId=0], queueOffset=0]
...

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class OnewayProducer {
    @SneakyThrows
    public static void main(String[] args) {
        // 1. 创建消息生产者,并指定生产者group
        DefaultMQProducer producer = new DefaultMQProducer("cana-producer-group");
        // 2. 指定NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        // 3. 启动producer
        producer.start();
        // 4. 创建消息对象,指定主题Topic、Tag和消息体
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("cana-topic", "oneway-tag", ("hello oneway msg" + i).getBytes());
            // 发送消息
            producer.sendOneway(msg);
        }
        // 6. 关闭生产者producer
        producer.shutdown();
    }
}

运行结果:

1
2
3
4
2020-11-26 16:32:33 INFO  [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9877] result: true
2020-11-26 16:32:33 INFO  [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[192.168.1.4:10911] result: true
2020-11-26 16:32:33 INFO  [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[192.168.1.4:11911] result: true
...

在控制台查看消息

http://img.cana.space/picStore/20201126163455.png

消费消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class Consumer {
    @SneakyThrows
    public static void main(String[] args) {
        // 实例化消息消费者,指定group
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cana-consumer-group");
        // 指定Namesrv地址信息
        consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        // 订阅TOPIC
//        consumer.subscribe("cana-topic", "*");
//        consumer.subscribe("cana-topic", "sync-tag||async-tag");
        consumer.subscribe("cana-topic", "sync-tag");
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.stream().forEach(msg -> log.info("msg:{}", msg));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

运行结果:

http://img.cana.space/picStore/20201126165717.png

消息体是byte数组,也可以自己转化成string。

负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

http://img.cana.space/picStore/20201126181651.png

勾选允许并行执行,开3个Consumer

20201126181720

执行space.cana.rmq.demo.base.producer.SyncProducer#main

可以看到默认就是以负载均衡的方式进行消费,如下:

http://img.cana.space/picStore/20201126181855.png

我们也可以显示地设置负载均衡模式

1
consumer.setMessageModel(MessageModel.CLUSTERING);

广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
....
consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.stream().forEach(msg -> log.info("msg:{}", msg));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
// 使用广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 启动消费者
        consumer.start();

重启3个消费者客户端,发送10条同步消息

http://img.cana.space/picStore/20201126182429.png

可以看到每个客户端都收到10条消费消息

顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

http://img.cana.space/picStore/20201126183035.png

可以采用局部消息顺序,消息队列是天然先进先出的,只要分别保证张三和李四各自的消息发到一个队列就ok,如下:

http://img.cana.space/picStore/20201126183432.png

在实际业务中,就是根据订单号进行发送的,同一个订单id发到同一个队列里面去,消费者这边是多线程消费,但是多线程会同步处理这一个队列上的消息,这样就不会错乱。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

生产者发送顺序消息

生产者根据业务标识将消息发送到一个队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package space.cana.rmq.demo.order;

import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

@Slf4j
public class OrderProducer {

    @ToString
    @Data
    @AllArgsConstructor
    static class OrderStep {
        private long orderId;
        private String desc;
    }

    private static List<OrderStep> orderList = Lists.newArrayList(
            //  1039L   : 创建    付款 推送 完成
            //  1065L   : 创建   付款
            //  7235L   :创建    付款
            new OrderStep(1039L, "创建"),
            new OrderStep(1039L, "付款"),
            new OrderStep(1039L, "推送"),
            new OrderStep(1039L, "完成"),

            new OrderStep(1065L, "创建"),
            new OrderStep(1065L, "付款"),
            new OrderStep(1065L, "推送"),
            new OrderStep(1065L, "完成"),

            new OrderStep(7235L, "创建"),
            new OrderStep(7235L, "付款"),
            new OrderStep(7235L, "推送"),
            new OrderStep(7235L, "完成")
    );

    @SneakyThrows
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("cana-order-producer-group");
        producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        producer.start();
        orderList.forEach(o -> {
            Message msg = new Message("cana-order-topic", "cana-order-tag", o.toString().getBytes());
            try {
                /**
                 * 参数一:消息对象
                 * 参数二:消息队列的选择器
                 * 参数三:选择队列的业务标识(订单ID)
                 */
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    /**
                     *
                     * @param mqs:队列集合
                     * @param msg:消息对象
                     * @param arg:业务标识的参数,也就是上面的参数三,订单ID
                     * @return
                     */
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        long orderId = (long) arg;
                        // 将orderId根据队列个数取模,这样相同的orderId会进入到相同的队列
                        return mqs.get((int) (orderId % mqs.size()));
                    }
                }, o.getOrderId());
                log.info("sendResult:{}", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        producer.shutdown();
    }
}

消费者接受顺序消息

使用MessageListenerOrderly这个监听器,在消息消费的时候,对于一个队列的消息就采取多线程同步处理的方式,这样就能保证消息消费的顺序性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package space.cana.rmq.demo.order;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;


@Slf4j
public class OrderConsumer {
    @SneakyThrows
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cana-consumer-group");
        consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        consumer.subscribe("cana-order-topic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                msgs.stream().forEach(msg ->
                        log.info("线程名称:{}, msg-body:{}",
                                Thread.currentThread().getName(),
                                new String(msg.getBody())
                        )
                );
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}

验证

先发送,再消费,查看消费者日志:

http://img.cana.space/picStore/20201126193541.png

在broker.properties中我们配置了4个队列,可以看到消费者客户端在处理3个订单的时候分别使用不同的线程线程进行消费。

注意
实际测试的时候,得先启动生产者发送消息,再启动消费者才会同一个id对应同一个线程,消费者启动后会卡一会儿才收到消息,至于原因,下次一定。

小结

生产者根据业务标识将消息发送到一个队列,消费者使用顺序监听器,对每个队列绑定一个线程进行消费处理。

延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

消息发送

1
2
3
// 设置延时等级
            // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 索引从1开始
            msg.setDelayTimeLevel(2); // 5s

消息监听

1
2
3
4
5
6
7
8
9
consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.stream().forEach(msg ->
                        log.info("msg:{},延时时间:{}", msg, (System.currentTimeMillis() - msg.getBornTimestamp()))
                );
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

需要先启动消费者再启动生产者

http://img.cana.space/picStore/20201126200811.png

使用限制

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

1
2
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

发送批量消息

如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //处理error
}

消息接受和之前没有区别

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,使用一个迭代器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
    @Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}

使用迭代器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

注意,还支持空参数或者*表示所有(if null or * expression,meaning subscribe all)。

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

SQL基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

1
public void subscribe(finalString topic, final MessageSelector messageSelector)

消息生产者

发送消息时,你能通过putUserProperty来设置消息的属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

消息消费者

用MessageSelector.bySql来使用sql筛选消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

事务消息

rocketmq事务消息解决的问题:解决本地数据库事务执行与消息发送的一致性或者说原子性问题。这里界限一定要明白,是确保MQ生产端正确无误地将消息发送出来,没有多发,也不会漏发。但至于发送后消费端有没有正常的消费掉,这种异常场景将由MQ消息消费失败重试机制来保证,不在此次的讨论范围内。

流程分析

http://img.cana.space/picStore/20201126221713.png

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交事务消息的补偿流程

事务消息发送及提交

  1. 发送消息(half消息)。
  2. 服务端响应消息写入结果。
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对消费端业务不可见,生产端本地逻辑不执行)。
  4. broker根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见

事务补偿

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态
  3. broker根据生产端本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况,生产端需要提供回调接口供broker进行回查。

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交状态,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚状态,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,需要生产端发出commit/rollback指令,或者超时回查来进一步确定状态是要提交还是回滚。

发送事务消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package space.cana.rmq.demo.transaction;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

@Slf4j
public class TransactionProducer {
    @SneakyThrows
    public static void main(String[] args) {
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("cana-transaction-producer-group");
        transactionMQProducer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        transactionMQProducer.setTransactionListener(new TransactionListener() {
            /**
             * 如果half消息成功提交到broker则broker会回调这个方法执行生产端后面的流程
             * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
             *
             * @param msg Half(prepare) message
             * @param arg Custom business parameter 自定义业务参数,由transactionMQProducer.sendMessageInTransaction带过来
             * @return Transaction state
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                int i = (int) arg;
                switch (i) {
                    case 1:
                        log.info("executeLocalTransaction,i:{}, 返回broker->COMMIT_MESSAGE", i);
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        log.info("executeLocalTransaction,i:{}, 返回broker->ROLLBACK_MESSAGE", i);
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        log.info("executeLocalTransaction,i:{}, 返回broker->UNKNOW", i);
                        // 当传进来的i==3时,返回未知状态,broker就会回调checkLocalTransaction方法。
                        return LocalTransactionState.UNKNOW;
                }
            }

            /**
             * broker创建half消息并成功响应生产端,生产端没有返回响应给broker,也就是上面没有返回broker一个本地事务状态LocalTransactionState,
             * 那么broker就会回调这个方法来查看本地事务状态
             * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
             * method will be invoked to get local transaction status.
             *
             * @param msg Check message
             * @return Transaction state
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                log.info("checkLocalTransaction, 返回broker->COMMIT_MESSAGE,msg-body:{}", new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        transactionMQProducer.start();
        // 开始发送消息,演示3种消息的发送 1-成功提交本地事务 2-本地事务回滚 3-本地事务状态未知,broker check之后返回成功提交
        for (int i = 1; i <= 3; i++) {
            Message msg = new Message("cana-transaction-topic", "transaction-tag", ("hello transaction msg" + i).getBytes());
            transactionMQProducer.sendMessageInTransaction(msg, i);
        }
        // 这儿别自动shutdown,需要等待broker回调
//        transactionMQProducer.shutdown();
    }
}

验证

编写消费者消费消息,成功收到1和3表示发送事务消息正常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class TransactionConsumer {
    @SneakyThrows
    public static void main(String[] args) {
        // 实例化消息消费者,指定group
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cana-transaction-consumer-group");
        // 指定Namesrv地址信息
        consumer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        consumer.subscribe("cana-transaction-topic", "transaction-tag");
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 演示3种消息的发送 1-成功提交本地事务 2-本地事务回滚 3-本地事务状态未知,broker check之后返回成功提交
                // 所以这里应该会收到1和3
                msgs.stream().forEach(msg -> log.info("msg-body:{}", new String(msg.getBody())));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 启动消费者
        consumer.start();
    }
}

运行结果,日志:

生产者:

http://img.cana.space/picStore/20201126222014.png

消费者:

http://img.cana.space/picStore/20201126222026.png

符合预期

注意事项

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID(GROUP) 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

总结

集群工作流程

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与这些队列所在的所有Broker建立长连接从而向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟这些Broker建立连接通道,开始消费消息。