案例介绍
模拟电商网站购物场景中的【下单】和【支付】业务
业务分析
下单

下单流程
- 用户请求订单系统下单
- 订单系统通过RPC调用订单服务下单
- 订单服务调用优惠券服务,扣减优惠券
- 订单服务调用调用库存服务,校验并扣减库存
- 订单服务调用用户服务,扣减用户余额
- 订单服务完成确认订单
支付

支付流程
- 用户请求商家支付系统
- 商家支付系统将用户请求重定向到第三方支付平台进行支付
- 用户通过第三方支付平台支付成功后,第三方支付平台回调商家支付系统
- 商家支付系统调用订单服务修改订单状态
- 商家支付系统调用积分服务添加积分
- 商家支付系统调用日志服务记录日志
问题分析
问题1
用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。如何保证数据的完整性?

解决方案有多种,如下:
-
先说明,这里不能采用rocketmq事务消息,因为还在下单环节,调用库存或者优惠券服务等可能校验不成功失败。事务消息一般用在校验什么都通过的情况,后面的服务必须成功才行,就相当发送通知消息一样,一定要成功。
事务消息,本地事务成功则调用的其他服务必须成功,一致性由消息重试保证,事务失败则不调其他服务,由事务消息机制保证。
-
分布式事务(tcc、at、sega、xa)
-
其实是分布式事务的变种,成功则直接返回下单成功,失败则通过发送失败消息给其他服务,其他服务回退处理完结后再发出回退成功消息由订单系统监听,当监听到所有服务都回退完毕后再取消订单返回下单结果,其实就相当于tcc的cancle阶段,只不过由订单系统充当了tc、tm、rm三者的角色,在分布式事务框架里tc的角色一般由一个服务器来承担,例如seta。
下面介绍使用MQ保证在下单失败后系统数据的完整性

问题2
用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。

上面这种方式响应第三方支付平台会很慢,商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?
通过MQ进行数据分发,提高系统处理性能

技术分析
技术选型
- SpringBoot 2.0.1
- dubbo-spring-boot-starter 2.0.0
- Zookeeper 3.5.6
- rocketmq-spring-boot-starter,2.0.3
- Mysql 5.7

SpringBoot整合RocketMQ
下载rocketmq-spring项目
将rocketmq-spring安装到本地仓库,这种方式较直接依赖快点,但也麻烦点,网速快直接依赖好了
1
|
mvn install -Dmaven.skip.test=true
|
消息生产者
pom
1
2
3
4
|
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
|
配置文件
1
2
|
rocketmq.name-server=127.0.0.1:9876;127.0.0.1:9877
rocketmq.producer.group=boot-rmq-producer-group
|
启动类
1
2
3
4
5
6
|
@SpringBootApplication
public class ProducerMain {
public static void main(String[] args) {
SpringApplication.run(ProducerMain.class, args);
}
}
|
测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void test() {
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter},
* wrap it as a message and send it to the given destination.
* @param destination the target destination
* @param payload the Object to use as payload
*/
rocketMQTemplate.convertAndSend("boot-rmp-topic", "hello boot rmq");
}
}
|
运行完测试程序后,查看控制台:

消息消费者
pom
同生产者
配置文件
1
2
3
4
|
server.port=8080
rocketmq.name-server=127.0.0.1:9876;127.0.0.1:9877
# 这个是自己定义的变量,需要在代码中引入
rocketmq.consumer.group=boot-rmq-consumer-group
|
启动类
1
2
3
4
5
6
|
@SpringBootApplication
public class ConsumerMain {
public static void main(String[] args) {
SpringApplication.run(ConsumerMain.class, args);
}
}
|
消息监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Slf4j
@RocketMQMessageListener(
topic = "boot-rmp-topic",
consumeMode = ConsumeMode.ORDERLY, // 默认,负载均衡模式
consumerGroup = "${rocketmq.consumer.group}"
)
@Component
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接受到消息:{}", s);
}
}
|
测试
producer发送一条消息,查看consumer日志:

spring整合dubbo
zk集群搭建
参考地址
latest为3.5.6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
version: '3.1'
services:
zoo1:
image: zookeeper:3.5.6
restart: always
hostname: zoo1
container_name: zoo1
ports:
- 2181:2181
# 访问zk实例内嵌的web控制台
# 在configuration目录下我们可以看到zk实例的基本配置信息,比如server_id、提供客户端服务的端口、数据目录、日志目录等
# 在stats目录下可以看到zk实例的运行状态信息,比如节点身份、数据大小、日志大小等
- 8081:8080
volumes:
- ./zoo1/data:/data
- ./zoo1/datalog:/datalog
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo2:
image: zookeeper:3.5.6
restart: always
hostname: zoo2
container_name: zoo2
ports:
- 2182:2181
- 8082:8080
volumes:
- ./zoo2/data:/data
- ./zoo2/datalog:/datalog
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo3:
image: zookeeper:3.5.6
restart: always
hostname: zoo3
container_name: zoo3
ports:
- 2183:2181
- 8083:8080
volumes:
- ./zoo3/data:/data
- ./zoo3/datalog:/datalog
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
|
检查集群状态:
1
2
3
4
5
6
7
8
|
# david @ Davids-Macbook-Pro in ~/docker/compose-config/zk [14:42:41]
$ de zoo3 bash
root@zoo3:/apache-zookeeper-3.5.6-bin# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
root@zoo3:/apache-zookeeper-3.5.6-bin#
|
使用本地zk客户端连接zk集群
1
|
$ ./zkCli.sh -server localhost:2181,localhost:2182,localhost:2183
|
注意
由于使用了apache的dubbo,其内部集成zk客户端curator,curator会起一个线程去处理这些server信息,那么肯定是要报错,因为宿主机不认识这些网址
1
|
2020-11-27 16:16:57.983 INFO 35974 --- [ain-EventThread] o.a.c.framework.imps.EnsembleTracker : New config event received: {server.1=zoo1:2888:3888:participant;0.0.0.0:2181, version=0, server.3=zoo3:2888:3888:participant;0.0.0.0:2181, server.2=0.0.0.0:2888:3888:participant;0.0.0.0:2181}
|
这个功能主要是做一些兼容性设置,不影响程序正常使用,我们使用客户端连接zk集群
1
2
3
4
5
|
$ ./zkCli.sh -server localhost:2181,localhost:2182,localhost:2183
[zk: localhost:2181,localhost:2182,localhost:2183(CONNECTED) 3] ls /
[dubbo, zookeeper]
[zk: localhost:2181,localhost:2182,localhost:2183(CONNECTED) 4] ls /dubbo
[com.eh.boot.rmq.dubbo.api.IUserService]
|
可以看到服务已经注册到zk中了。
为了测试方便,后面还是使用单机版的zk
dubbo-admin
dubbo管理控制台安装
启动后可以看到服务被注册进来了

服务消费者
pom
同服务提供者,唯一区别是使用spring-boot-starter-web,这样可以通过rest方式调用进行测试
1
2
3
4
|
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
|
配置文件
和服务提供者区别,消费者不需要暴露监听端口
1
2
3
4
5
|
server.port=8001
spring.application.name=boot-rmq-dubbo-consumer
dubbo.application.name=boot-rmq-dubbo-consumer-dubbo
dubbo.registry.protocol=zookeeper
dubbo.registry.address=localhost:2181
|
启动类
1
2
3
4
5
6
7
|
@EnableDubbo
@SpringBootApplication
public class ConsumerMain {
public static void main(String[] args) {
SpringApplication.run(ConsumerMain.class, args);
}
}
|
业务类
HelloController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package com.eh.boot.rmq.dubbo.consumer.controller;
import com.eh.boot.rmq.dubbo.api.IUserService;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Reference
private IUserService userService;
@GetMapping("/sayHello")
public String sayHello(String name) {
return userService.sayHello(name);
}
}
|
访问http://localhost:8001/sayHello?name=david
,页面打印hello, david
查看dubbo控制台

失败补偿机制
订单系统定义mq相关
1
2
3
4
5
6
7
8
|
# namesrv
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
# 生产者group
rocketmq.producer.group=orderProducerGroup
# 生产者topic
mq.order.topic=orderTopic
# tag
mq.order.tag.cancel=order_cancel
|
发送订单失败消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${mq.order.topic}")
private String topic;
@Value("${mq.order.tag.cancel}")
private String cancelTag;
...
sendMessage(topic,
cancelTag,
cancelOrderMQ.getOrderId().toString(), // key,设置一个业务标识,比如订单id
JSON.toJSONString(cancelOrderMQ));
} catch (Exception e1) {
...
|