前言及问题重现
参考:幂等实现_基于乐观锁
zk实现分布式锁基本原理
因为zk的节点是具有互斥性的,唯一性的,所以一旦一个节点被创建,就不能再被创建,利用这个特性,每次线程进入,都进行节点锁判断,如果锁节点被创建,我们就创建监听,等待其他线程执行完业务释放锁,然后我们再获取锁,执行自己的业务;
执行流程

很明显以上流程会出现羊群效应,比如有100个线程进入,只可能有1个线程能获取锁,导致其他99个线程创建监听和等待;当第一个线程执行完业务代码,释放锁,其他98个线程有去抢占资源,重复前面的操作;性能很低下。
实际开发我们用的是顺序临时节点(EPHEMERAL_SEQUENTIAL);

基于前面的大致流程,再细化下:
step01:线程进入,直接再/lock下创建顺序临时节点;
step02:判断自己是不是/lock节点下序号最小的节点;if 是 获取锁 else 否 对前面一个节点进行监听;
step03:获取锁后,执行业务逻辑,执行完,释放锁(delete删除节点),然后后面一个会得到通知,继续重复step02即可;
基于zk的分布式锁实现
参考:Curator -> Getting Started
环境说明
- zk:3.5.8
- zk客户端 curator :5.1.0
1
2
|
$ docker pull zookeeper:3.5.8
$ docker run -p 2181:2181 -d --name zookeeper ce
|
搭建工程
pom:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<!-- Zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<!--解决日志冲突,排除self4j-log4j12-->
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--curator-recipes已经包含curator-framework依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
|
往容器中添加zk客户端
CuratorConfig
1
2
3
4
5
6
7
8
9
10
11
|
@Configuration
public class CuratorConfig {
// http://curator.apache.org/getting-started.html
@Bean(initMethod = "start")
CuratorFramework curatorFramework() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
return client;
}
}
|
改造乐观锁工程,去掉version
1
2
3
|
<update id="updateOrderById">
update t_order set status=#{status} where order_id = #{orderId}
</update>
|
改造RechargeServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
@Slf4j
@Service
public class RechargeServiceImpl implements RechargeService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountMapper accountMapper;
@Autowired
private CuratorFramework curatorFramework;
@Transactional
@SneakyThrows
@Override
public boolean recharge(Integer orderId, Integer amount) {
InterProcessLock lock = new InterProcessMutex(curatorFramework, "/order_" + orderId);
try {
log.info("尝试获取锁");
lock.acquire(); // 获取锁
log.info("成功获取锁,执行业务处理逻辑");
return processRecharge(orderId, amount);
} finally {
lock.release();
}
}
@SneakyThrows
private boolean processRecharge(Integer orderId, Integer amount) {
log.info("查询订单");
Order order = orderMapper.getByOrderId(orderId);
if (order.getStatus() == 0) { // check订单支付状态
log.info("未支付状态");
TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(10, 100)); // 模拟计算时间
order.setStatus(1);
log.info("更新支付状态...");
int affectRow = orderMapper.updateOrderById(order);
if (affectRow == 1) {
log.info("账户充值...");
accountMapper.recharge(order.getAccountId(), order.getAmount());
} else {
log.info("更新支付状态失败,数据过期");
return false;
}
} else {
log.info("发现订单已处理");
return true;
}
return false;
}
}
|
测试:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Test
@SneakyThrows
public void testProblem() {
int retryTimes = 10; // 模拟客户端请求次数
CountDownLatch latch = new CountDownLatch(retryTimes);
Runnable runnable = () -> {
rechargeService.recharge(1, 10);
latch.countDown();
};
// 模拟多次重试情况
IntStream.range(0, retryTimes).forEach(i -> new Thread(runnable, "testProblem-" + i).start());
latch.await();
log.info("=====>账户余额:{}", accountMapper.getAccountById(1).getBalance());
}
|
执行日志:

可以看到只有第一个获取到锁的线程成功充值,其他线程均发现订单已处理,账户余额符合预期。