目录

rocketmq使用案例

案例介绍

模拟电商网站购物场景中的【下单】和【支付】业务

业务分析

下单

http://img.cana.space/picStore/20201126232515.png

下单流程

  1. 用户请求订单系统下单
  2. 订单系统通过RPC调用订单服务下单
  3. 订单服务调用优惠券服务,扣减优惠券
  4. 订单服务调用调用库存服务,校验并扣减库存
  5. 订单服务调用用户服务,扣减用户余额
  6. 订单服务完成确认订单

支付

http://img.cana.space/picStore/20201126233113.png

支付流程

  1. 用户请求商家支付系统
  2. 商家支付系统将用户请求重定向到第三方支付平台进行支付
  3. 用户通过第三方支付平台支付成功后,第三方支付平台回调商家支付系统
  4. 商家支付系统调用订单服务修改订单状态
  5. 商家支付系统调用积分服务添加积分
  6. 商家支付系统调用日志服务记录日志

问题分析

问题1

用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?

http://img.cana.space/picStore/20201126233525.png

解决方案有多种,如下:

  • 先说明,这里不能采用rocketmq事务消息,因为还在下单环节,调用库存或者优惠券服务等可能校验不成功失败。事务消息一般用在校验什么都通过的情况,后面的服务必须成功才行,就相当发送通知消息一样,一定要成功。

    事务消息,本地事务成功则调用的其他服务必须成功,一致性由消息重试保证,事务失败则不调其他服务,由事务消息机制保证。

  • 分布式事务(tcc、at、sega、xa)

  • 其实是分布式事务的变种,成功则直接返回下单成功,失败则通过发送失败消息给其他服务,其他服务回退处理完结后再发出回退成功消息由订单系统监听,当监听到所有服务都回退完毕后再取消订单返回下单结果,其实就相当于tcc的cancle阶段,只不过由订单系统充当了tc、tm、rm三者的角色,在分布式事务框架里tc的角色一般由一个服务器来承担,例如seta。

下面介绍使用MQ保证在下单失败后系统数据的完整性

http://img.cana.space/picStore/20201126233606.png

问题2

用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。

http://img.cana.space/picStore/20201126233656.png

上面这种方式响应第三方支付平台会很慢,商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?

通过MQ进行数据分发,提高系统处理性能

http://img.cana.space/picStore/20201126233818.png

技术分析

技术选型

  • SpringBoot 2.0.1
  • dubbo-spring-boot-starter 2.0.0
  • Zookeeper 3.5.6
  • rocketmq-spring-boot-starter,2.0.3
  • Mysql 5.7

http://img.cana.space/picStore/20201127110058.png

SpringBoot整合RocketMQ

下载rocketmq-spring项目

将rocketmq-spring安装到本地仓库,这种方式较直接依赖快点,但也麻烦点,网速快直接依赖好了

1
mvn install -Dmaven.skip.test=true

消息生产者

pom

1
2
3
4
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

配置文件

1
2
rocketmq.name-server=127.0.0.1:9876;127.0.0.1:9877
rocketmq.producer.group=boot-rmq-producer-group

启动类

1
2
3
4
5
6
@SpringBootApplication
public class ProducerMain {
    public static void main(String[] args) {
        SpringApplication.run(ProducerMain.class, args);
    }
}

测试类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void test() {
        /**
         * Convert the given Object to serialized form, possibly using a
         * {@link org.springframework.messaging.converter.MessageConverter},
         * wrap it as a message and send it to the given destination.
         * @param destination the target destination
         * @param payload the Object to use as payload
         */
        rocketMQTemplate.convertAndSend("boot-rmp-topic", "hello boot rmq");
    }
}

运行完测试程序后,查看控制台:

http://img.cana.space/picStore/20201127123009.png

消息消费者

pom

同生产者

配置文件

1
2
3
4
server.port=8080
rocketmq.name-server=127.0.0.1:9876;127.0.0.1:9877
# 这个是自己定义的变量,需要在代码中引入
rocketmq.consumer.group=boot-rmq-consumer-group

启动类

1
2
3
4
5
6
@SpringBootApplication
public class ConsumerMain {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerMain.class, args);
    }
}

消息监听器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Slf4j
@RocketMQMessageListener(
        topic = "boot-rmp-topic",
        consumeMode = ConsumeMode.ORDERLY, // 默认,负载均衡模式
        consumerGroup = "${rocketmq.consumer.group}"
)
@Component
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        log.info("接受到消息:{}", s);
    }
}

测试

producer发送一条消息,查看consumer日志:

http://img.cana.space/picStore/20201127132829.png

spring整合dubbo

zk集群搭建

参考地址

latest为3.5.6

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '3.1'

services:
  zoo1:
    image: zookeeper:3.5.6
    restart: always
    hostname: zoo1
    container_name: zoo1
    ports:
      - 2181:2181
      # 访问zk实例内嵌的web控制台
      # 在configuration目录下我们可以看到zk实例的基本配置信息,比如server_id、提供客户端服务的端口、数据目录、日志目录等
      # 在stats目录下可以看到zk实例的运行状态信息,比如节点身份、数据大小、日志大小等
      - 8081:8080
    volumes:
      - ./zoo1/data:/data
      - ./zoo1/datalog:/datalog
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper:3.5.6
    restart: always
    hostname: zoo2
    container_name: zoo2
    ports:
      - 2182:2181
      - 8082:8080
    volumes:
      - ./zoo2/data:/data
      - ./zoo2/datalog:/datalog
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper:3.5.6
    restart: always
    hostname: zoo3
    container_name: zoo3
    ports:
      - 2183:2181
      - 8083:8080
    volumes:
      - ./zoo3/data:/data
      - ./zoo3/datalog:/datalog
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

检查集群状态:

1
2
3
4
5
6
7
8
# david @ Davids-Macbook-Pro in ~/docker/compose-config/zk [14:42:41]
$ de zoo3 bash
root@zoo3:/apache-zookeeper-3.5.6-bin# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
root@zoo3:/apache-zookeeper-3.5.6-bin#

使用本地zk客户端连接zk集群

1
$ ./zkCli.sh -server localhost:2181,localhost:2182,localhost:2183
注意

由于使用了apache的dubbo,其内部集成zk客户端curator,curator会起一个线程去处理这些server信息,那么肯定是要报错,因为宿主机不认识这些网址

1
2020-11-27 16:16:57.983  INFO 35974 --- [ain-EventThread] o.a.c.framework.imps.EnsembleTracker     : New config event received: {server.1=zoo1:2888:3888:participant;0.0.0.0:2181, version=0, server.3=zoo3:2888:3888:participant;0.0.0.0:2181, server.2=0.0.0.0:2888:3888:participant;0.0.0.0:2181}

这个功能主要是做一些兼容性设置,不影响程序正常使用,我们使用客户端连接zk集群

1
2
3
4
5
$ ./zkCli.sh -server localhost:2181,localhost:2182,localhost:2183
[zk: localhost:2181,localhost:2182,localhost:2183(CONNECTED) 3] ls /
[dubbo, zookeeper]
[zk: localhost:2181,localhost:2182,localhost:2183(CONNECTED) 4] ls /dubbo
[com.eh.boot.rmq.dubbo.api.IUserService]

可以看到服务已经注册到zk中了。

为了测试方便,后面还是使用单机版的zk

dubbo-admin

dubbo管理控制台安装

启动后可以看到服务被注册进来了

http://img.cana.space/picStore/20201127165013.png

服务消费者

pom

同服务提供者,唯一区别是使用spring-boot-starter-web,这样可以通过rest方式调用进行测试

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

配置文件

和服务提供者区别,消费者不需要暴露监听端口

1
2
3
4
5
server.port=8001
spring.application.name=boot-rmq-dubbo-consumer
dubbo.application.name=boot-rmq-dubbo-consumer-dubbo
dubbo.registry.protocol=zookeeper
dubbo.registry.address=localhost:2181

启动类

1
2
3
4
5
6
7
@EnableDubbo
@SpringBootApplication
public class ConsumerMain {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerMain.class, args);
    }
}

业务类

HelloController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package com.eh.boot.rmq.dubbo.consumer.controller;

import com.eh.boot.rmq.dubbo.api.IUserService;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Reference
    private IUserService userService;

    @GetMapping("/sayHello")
    public String sayHello(String name) {
        return userService.sayHello(name);
    }
}

访问http://localhost:8001/sayHello?name=david,页面打印hello, david

查看dubbo控制台

http://img.cana.space/picStore/20201127170507.png

失败补偿机制

订单系统定义mq相关

1
2
3
4
5
6
7
8
# namesrv
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
# 生产者group
rocketmq.producer.group=orderProducerGroup
# 生产者topic
mq.order.topic=orderTopic
# tag
mq.order.tag.cancel=order_cancel

发送订单失败消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
 @Autowired
 private RocketMQTemplate rocketMQTemplate;

 @Value("${mq.order.topic}")
 private String topic;

 @Value("${mq.order.tag.cancel}")
 private String cancelTag;

...
  sendMessage(topic, 
                        cancelTag, 
                        cancelOrderMQ.getOrderId().toString(),  // key,设置一个业务标识,比如订单id
                    JSON.toJSONString(cancelOrderMQ));
    } catch (Exception e1) {
...