目录

幂等实现_基于zk分布式锁

前言及问题重现

参考:幂等实现_基于乐观锁

zk实现分布式锁基本原理

因为zk的节点是具有互斥性的,唯一性的,所以一旦一个节点被创建,就不能再被创建,利用这个特性,每次线程进入,都进行节点锁判断,如果锁节点被创建,我们就创建监听,等待其他线程执行完业务释放锁,然后我们再获取锁,执行自己的业务;

执行流程

http://img.cana.space/picStore/20201124010942.png

很明显以上流程会出现羊群效应,比如有100个线程进入,只可能有1个线程能获取锁,导致其他99个线程创建监听和等待;当第一个线程执行完业务代码,释放锁,其他98个线程有去抢占资源,重复前面的操作;性能很低下。

实际开发我们用的是顺序临时节点(EPHEMERAL_SEQUENTIAL);

http://img.cana.space/picStore/20201124011152.png

基于前面的大致流程,再细化下:

step01:线程进入,直接再/lock下创建顺序临时节点;

step02:判断自己是不是/lock节点下序号最小的节点;if 是 获取锁 else 否 对前面一个节点进行监听;

step03:获取锁后,执行业务逻辑,执行完,释放锁(delete删除节点),然后后面一个会得到通知,继续重复step02即可;

基于zk的分布式锁实现

参考:Curator -> Getting Started

环境说明

  • zk:3.5.8
  • zk客户端 curator :5.1.0
1
2
$ docker pull zookeeper:3.5.8
$ docker run -p 2181:2181 -d --name zookeeper ce

搭建工程

pom:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
<!-- Zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>${zookeeper.version}</version>
    <!--解决日志冲突,排除self4j-log4j12-->
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!--curator-recipes已经包含curator-framework依赖-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>${curator.version}</version>
</dependency>

往容器中添加zk客户端

CuratorConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Configuration
public class CuratorConfig {

    // http://curator.apache.org/getting-started.html
    @Bean(initMethod = "start")
    CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
        return client;
    }
}

改造乐观锁工程,去掉version

1
2
3
    <update id="updateOrderById">
        update t_order set status=#{status} where order_id = #{orderId}
    </update>

改造RechargeServiceImpl

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@Service
public class RechargeServiceImpl implements RechargeService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private AccountMapper accountMapper;

    @Autowired
    private CuratorFramework curatorFramework;

    @Transactional
    @SneakyThrows
    @Override
    public boolean recharge(Integer orderId, Integer amount) {

        InterProcessLock lock = new InterProcessMutex(curatorFramework, "/order_" + orderId);
        try {
            log.info("尝试获取锁");
            lock.acquire(); // 获取锁
            log.info("成功获取锁,执行业务处理逻辑");
            return processRecharge(orderId, amount);
        } finally {
            lock.release();
        }
    }

    @SneakyThrows
    private boolean processRecharge(Integer orderId, Integer amount) {
        log.info("查询订单");
        Order order = orderMapper.getByOrderId(orderId);
        if (order.getStatus() == 0) { // check订单支付状态
            log.info("未支付状态");
            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(10, 100)); // 模拟计算时间
            order.setStatus(1);
            log.info("更新支付状态...");
            int affectRow = orderMapper.updateOrderById(order);
            if (affectRow == 1) {
                log.info("账户充值...");
                accountMapper.recharge(order.getAccountId(), order.getAmount());
            } else {
                log.info("更新支付状态失败,数据过期");
                return false;
            }
        } else {
            log.info("发现订单已处理");
            return true;
        }
        return false;
    }
}

测试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Test
@SneakyThrows
public void testProblem() {
    int retryTimes = 10; // 模拟客户端请求次数
    CountDownLatch latch = new CountDownLatch(retryTimes);
    Runnable runnable = () -> {
        rechargeService.recharge(1, 10);
        latch.countDown();
    };
    // 模拟多次重试情况
    IntStream.range(0, retryTimes).forEach(i -> new Thread(runnable, "testProblem-" + i).start());
    latch.await();
    log.info("=====>账户余额:{}", accountMapper.getAccountById(1).getBalance());
}

执行日志:

http://img.cana.space/picStore/20201124011931.png

可以看到只有第一个获取到锁的线程成功充值,其他线程均发现订单已处理,账户余额符合预期。