目录

幂等实现_基于redis分布式锁

前言及问题重现

参考:幂等实现_基于乐观锁

高效分布式锁

当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。

1、互斥

在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。

2、防止死锁

在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。

所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。

3、性能

对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。

所以在锁的设计时,需要考虑两点。

1、锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。

2、锁的范围尽量要小。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。

4、可重入

我们知道ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。

针对以上Redisson都能很好的满足,下面就来分析下它。

redis实现分布式锁的三个核心要素

加锁

最简单的方法是使用setnx命令。key是锁的唯一标识,按业务来决定命名,value为当前线程的线程ID。

比如想要给一种商品的秒杀活动加锁,可以给key命名为 “lock_sale_ID” 。而value设置成什么呢?我们可以姑且设置成1。加锁的伪代码如下:

setnx(key,1)当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁,当其他线程执行setnx返回0,说明key已经存在,该线程抢锁失败。

解锁

有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行del指令,伪代码如下:

del(key)释放锁之后,其他线程就可以继续执行setnx命令来获得锁。

锁超时

如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。

所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令,expire(key, 30)

Redisson实现分布式锁基本原理

  • 官网:https://redisson.org/
  • github开源地址:https://github.com/redisson/redisson/

原理图

http://img.cana.space/picStore/20201124103652.png

源码中加锁lua代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
if (redis.call('exists', KEYS[1]) == 0) then 
        redis.call('hset', KEYS[1], ARGV[2], 1);
         redis.call('pexpire', KEYS[1], ARGV[1]); 
         return nil;
          end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return nil;
        end;
return redis.call('pttl', KEYS[1]);

分析

为什么要使用lua语言

因为一大堆复杂的逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂逻辑执行的原子性。原子性是由redis保证的,redis会为lua脚本执行创建伪客户端模拟客户端调用redis执行命令,伪客户端执行lua脚本是排他的。

官网:Redis保证以原子方式执行脚本,执行脚本时不会执行其他脚本或Redis命令,类似于给执行lua脚本这段代码加了锁 。

lua字段解释

  • KEYS[1]:表示你加锁的那个key,比如说RLock lock = redisson.getLock(“myLock”);这里你自己设置了加锁的那个锁key就是“myLock”。
  • ARGV[1]:表示锁的有效期,默认30s
  • ARGV[2]:表示表示加锁的客户端ID,类似于这样:8743c9c0-0795-4907-87fd-6c719a6b4586:1

加锁机制

lua中第一个if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

如何加锁呢?很简单,用下面的hset命令:

1
hset myLock 8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

此时的myLock锁key的数据结构是:

1
2
3
4
myLock:
{
  8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
}

接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒(默认)

锁互斥机制

如果在这个时候,另一个客户端(客户端2)来尝试加锁,执行了同样的一段lua脚本,会怎样呢?

第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。

接着第二个if判断会执行“hexists mylock 客户端id”,来判断myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。

比如还剩15000毫秒的生存时间。此时客户端2会进入一个while循环,不停的尝试加锁。

可重入加锁机制

如果客户端1已经持有这把锁,可重入的加锁会怎么样呢

1
2
3
4
5
6
7
8
#重入加锁
RLock lock = redisson.getLock("myLock")
lock.lock();
//业务代码
lock.lock();
//业务代码
lock.unlock();
lock.unlock();

分析lua代码,第一个if判断不成立,“exists myLock” 会显示锁key已经存在了

第二个if会成立,因为myLock的hash数据结构中包含的客户端1的ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此时就会执行可重入加锁的逻辑,用incrby这个命令,对客户端1的加锁次数,累加1:

1
incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1  1

此时myLock数据结构变为下面这样:

1
2
3
4
myLock:
{
  8743c9c0-0795-4907-87fd-6c719a6b4586:1  2
}

释放锁机制

Redisson释放锁的lua代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
if (redis.call('exists', KEYS[1]) == 0) then
       redis.call('publish', KEYS[2], ARGV[1]);
        return 1; 
        end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
     return nil;
     end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then
     redis.call('pexpire', KEYS[1], ARGV[2]); 
     return 0; 
else redis.call('del', KEYS[1]); 
     redis.call('publish', KEYS[2], ARGV[1]); 
     return 1;
     end;
return nil;

执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。就是每次都对myLock数据结构中的那个加锁次数减1。如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key。然后另外的客户端2就可以尝试完成加锁了。

这就是所谓的分布式锁的开源Redisson框架的实现机制。一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。

watch dog自动延期机制

锁超时机制避免了获取锁的线程所在服务器宕机后一直释放不了锁的问题,但同时引入了一个新问题,假设我们使用默认的超时时间30s,极端情况下一个业务处理超过30s还没释放锁,那么此时就会出现多个线程同时获取到锁进而执行业务的情况,就有可能导致数据不一致。

看门狗就是为了防止这种情况出现的,Redisson中客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了。不会延长锁的有效期!

来看源码:

org.redisson.config.Config

1
private long lockWatchdogTimeout = 30 * 1000;

可以看到,这个加的分布式锁的超时时间默认是30秒.但是还有一个问题,那就是这个看门狗,多久来延长一次有效期呢?我们往下看

org.redisson.RedissonLock#scheduleExpirationRenewal

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

renewExpiration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

可以看出,获取锁成功就会开启一个定时任务,也就是watchdog,定时任务会定期检查去续期(这里定时用的是netty-common包中的HashedWheelTimer)。

io.netty.util.HashedWheelTimer#newTimeout

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    } else if (unit == null) {
        throw new NullPointerException("unit");
    } else {
        long pendingTimeoutsCount = this.pendingTimeouts.incrementAndGet();
        if (this.maxPendingTimeouts > 0L && pendingTimeoutsCount > this.maxPendingTimeouts) {
            this.pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending timeouts (" + this.maxPendingTimeouts + ")");
        } else {
            this.start();
            long deadline = System.nanoTime() + unit.toNanos(delay) - this.startTime;
            if (delay > 0L && deadline < 0L) {
                deadline = 9223372036854775807L;
            }

            HashedWheelTimer.HashedWheelTimeout timeout = new HashedWheelTimer.HashedWheelTimeout(this, task, deadline);
            this.timeouts.add(timeout);
            return timeout;
        }
    }
}

通过源码分析我们知道,默认情况下,加锁的时间是30秒.如果加锁的业务没有执行完,那么有效期到 30-10 = 20秒的时候,就会进行一次续期,把锁重置成30秒.那这个时候可能又有同学问了,那业务的机器万一宕机了呢?宕机了定时任务跑不了,就续不了期,那自然30秒之后锁就解开了呗.

使用redis作为分布式锁的缺点

最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。此时就会导致多个客户端对一个分布式锁完成了加锁。这时系统在业务上一定会出现问题,导致脏数据的产生。

所以这个就是redis cluster或者是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。

如何解决redis分布式锁主节点宕机锁丢失的问题

redis->distlock

事实上这类琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

在Redis的master节点上拿到了锁;但是这个加锁的key还没有同步到slave节点;master故障,发生故障转移,slave节点升级为master节点;导致锁丢失。

正因为如此,Redis作者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。笔者认为,Redlock也是Redis所有分布式锁实现方式中唯一能让面试官高潮的方式。

Redlock实现,antirez提出的redlock算法大概是这样的:

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:

获取当前Unix时间,以毫秒为单位。依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

redisson已经有对redlock算法封装,可以直接使用。

小结:和paxos过半机制道理是类似的,由一个节点变成多个节点,提高分区容错性。

基于redisson的分布式锁实现

参考:redisson -> Distributed locks

环境说明

  • redis:6.0.8
  • redisson:3.13.4

安装redis

1
2
$ docker pull redis:6.0.8
$ docker run -p 6379:6379 -d --name redis 16

搭建工程

RedissonConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Configuration
public class RedissonConfig {

    @Bean
    Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}

改造业务处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
@Service
public class RechargeServiceImpl implements RechargeService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private AccountMapper accountMapper;

    @Autowired
    private Redisson redisson;

    @Transactional
    @SneakyThrows
    @Override
    public boolean recharge(Integer orderId, Integer amount) {

        RLock lock = redisson.getLock("order_" + orderId); // new一个Redisson实例
        try {
            log.info("尝试获取锁");
            lock.lock(3, TimeUnit.SECONDS); // 尝试加锁并设置超时时间3s
            log.info("成功获取锁,执行业务处理逻辑");
            return processRecharge(orderId, amount);
        } finally {
            lock.unlock();
        }
    }

    @SneakyThrows
    private boolean processRecharge(Integer orderId, Integer amount) {
        log.info("查询订单");
        Order order = orderMapper.getByOrderId(orderId);
        if (order.getStatus() == 0) { // check订单支付状态
            log.info("未支付状态");
            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(10, 100)); // 模拟计算时间
            order.setStatus(1);
            log.info("更新支付状态...");
            int affectRow = orderMapper.updateOrderById(order);
            if (affectRow == 1) {
                log.info("账户充值...");
                accountMapper.recharge(order.getAccountId(), order.getAmount());
            } else {
                log.info("更新支付状态失败,数据过期");
                return false;
            }
        } else {
            log.info("发现订单已处理");
            return true;
        }
        return false;
    }
}

测试程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@Test
@SneakyThrows
public void testProblem() {
    int retryTimes = 10; // 模拟客户端请求次数
    CountDownLatch latch = new CountDownLatch(retryTimes);
    Runnable runnable = () -> {
        rechargeService.recharge(1, 5);
        latch.countDown();
    };
    // 模拟多次重试情况
    IntStream.range(0, retryTimes).forEach(i -> new Thread(runnable, "testProblem-" + i).start());
    latch.await();
    log.info("=====>账户余额:{}", accountMapper.getAccountById(1).getBalance());
}

日志:

http://img.cana.space/picStore/20201124101828.png

可以看到只有第一个获取到锁的线程成功充值,其他线程均发现订单已处理,账户余额符合预期。

基于jedis客户端实现分布式锁

设计思路:利用redis的setnx和getset原子性设置key和过期时间,通过自旋锁方式实现重试

直接上代码了,步骤说明见注释

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.eh.cana.redisdistributedlock.kit;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.nosql.redis.RedisDS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class RedisLockService {

    private final long DEFAULT_OPERATION_CONNECTION_TIMEOUT = 2000L;
    private final long DEFAULT_OPERATION_READ_TIMEOUT = 2000L;

    @Autowired
    private RedisDS redisDS;

    private final ThreadLocal<String> tl = new ThreadLocal<>();

    public boolean acquireLock(String key) {
        return acquireLock(key, DEFAULT_OPERATION_CONNECTION_TIMEOUT, DEFAULT_OPERATION_READ_TIMEOUT);
    }

    /**
     * 获取锁
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @param key
     * @param connectionTimeout 等待处理时间,也就是自旋锁最长循环时间
     * @param readTimeout       业务处理时间,如果检测到前一个线程超过业务处理时间则del原来的key并对所发起竞争
     * @return
     */
    public boolean acquireLock(String key, long connectionTimeout, long readTimeout) {
        Assert.notBlank(key);
        Jedis jedis = redisDS.getJedis();
        try {
            log.info("{}尝试获取锁", key);
            while (connectionTimeout > 0) {
                // 一定要保证集群内所有服务器时钟同步,之前因为运维没有配置这个导致了一次生产故障
                // 当前时间戳
                long currentTimeMillis = System.currentTimeMillis();
                // 计算这个key的超时时间点
                long expire = currentTimeMillis + readTimeout + 1;
                // 尝试获取锁
                if (jedis.setnx(key, String.valueOf(expire)) == 1) {
                    log.info("{} 获得锁, currentTimeMillis:{}, expire:{}", key, currentTimeMillis, expire);
                    tl.set(key);
                    return true;
                }
                // 如果没获取到锁需要检测锁是否超时, 
                String previousExpireStr = jedis.get(key);
              //1
                // 如果检测到锁已超时则对发起竞争,注意此刻可能会有多个线程同时抢锁,所以需要原子方法getset
                if (previousExpireStr != null && System.currentTimeMillis() > Long.valueOf(previousExpireStr)) {
                    // 极端情况可能会有覆盖超时时间情况,但是影响不大,无非就是key的超时时间本来是15:59:20.300变成15:59:20.320这样
                  // 1. 经过前面设置,t1超时时间点为320,t2执超时时间点为300
                  // 2. t1和t2同时执行到1处,获取旧的时间戳是3s前
                  // 3. t1和t2同时到达2处,t2先执行,redis被更新为300,t1再执行被更新为320,虽然过期时间有被覆盖,但是没啥影响,因为后面还有cas check,不会出现t1和t2同时获取锁的情况。
                  // 2
                    String old = jedis.getSet(key, String.valueOf(expire)); //2
                    // 说明这次是自己成功抢到锁,可以执行业务了
                    if (old != null && old.equals(previousExpireStr)) {
                        log.info("{} 获得锁(已kill前一个超时key), currentTimeMillis:{}, expire:{}", key, currentTimeMillis, expire);
                        tl.set(key);
                        return true;
                    }
                }
                // 到这里说明锁还未超时也就是有key对应的业务正在处理,需要等待锁释放
                log.debug("{}阻塞中...", key);
                // 随机等待1-10ms,防止线程饥饿。即,当同时到达多个进程,只会有一个进程获得锁,其他的都用同样的频率进行尝试,
                // 后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                // 使用随机等待时间可以在一定程度上保证公平性
                long randomWait = RandomUtil.randomInt(1, 10);
                TimeUnit.MILLISECONDS.sleep(randomWait);
                connectionTimeout -= randomWait;
            }
            log.error("{}等待处理超时", key);

        } catch (Throwable throwable) {
            log.error(key + "获取锁异常", throwable);
        } finally {
            // 释放连接
            jedis.close();
        }
        return false;
    }

    public void releaseLock() {
        Jedis jedis = redisDS.getJedis();
        try {
            String key = tl.get();
            if (!StringUtils.isEmpty(key) && jedis.del(key) == 1) {
                log.info("业务处理完毕,释放锁:{}", key);
                tl.remove();
            } else {
                log.error("释放锁异常:{}", StringUtils.isEmpty(key) ? "key为空" : key + ",删除结果不为1");
            }
        } catch (Exception e) {
            log.error("释放锁异常", e);
        } finally {
            jedis.close();
        }
    }
}

测试结果:

http://img.cana.space/picStore/20201124135346.png

结果符合预期

缺点:和之前的redisson一样,存在redis主从切换时可能的多个线程同时处理相同业务情况。

解决思路:参考redlock


上面使用System.nanoTime()这种方式有问题,这个时间是cpu的时钟计数器,每台机器不一样,要么使用毫秒使用,要么摈弃这种方式,使用redis2.8后新特性,set nx ex原子操作,改过以后变成:

redis2.8以前按照上述方式编码,是为了防止setnx完之后没有expire,与redis服务器断开连接,之后key一直得不到释放出现问题,所以利用setnx(key,过期时间),getset cap操作,替代这一原子操作,2.8以后支持原子操作,那么这种方式也就可以改成下面这种比较简单的实现了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Slf4j
@Service
public class RedisLockService {

    @Autowired
    private RedisLockCommand redisLockCommand;


    /**
     * 纳秒
     */
    private static final long MILLI_MICRO_CONVERSION = 1000L;
    /**
     * 默认锁竞争时间, 2s
     */
    private final long DEFAULT_OPERATION_CONNECTION_TIMEOUT = 5000L;
    /**
     * 默认业务执行时间, 3s
     */
    private final long DEFAULT_OPERATION_READ_TIMEOUT = 10000L;

    private final ThreadLocal<String> tl = new ThreadLocal<>();

    public boolean acquireLock(String key) {
        return acquireLock(key, DEFAULT_OPERATION_CONNECTION_TIMEOUT, DEFAULT_OPERATION_READ_TIMEOUT);
    }

    /**
     * 获取锁
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @param key
     * @param connectionTimeout 等待处理时间,也就是自旋锁最长循环时间
     * @param readTimeout       业务处理时间,如果检测到前一个线程超过业务处理时间则del原来的key并对所发起竞争
     * @return
     */
    public boolean acquireLock(String key, long connectionTimeout, long readTimeout) {
        Assert.notNull(key, "lock key can not be null");
        try {
            log.info("{}尝试获取锁", key);
            long wait = connectionTimeout * MILLI_MICRO_CONVERSION;
            while (wait > 0) {
                // 尝试获取锁
                if (redisLockCommand.setnx(key, "1", readTimeout, TimeUnit.MILLISECONDS)) {
                    log.info("{} 获得锁, currentTimeMillis:{}", key, System.currentTimeMillis());
                    tl.set(key);
                    return true;
                }
                // 到这里说明锁还未超时或者key对应的业务正在处理,需要等待锁释放
                log.debug("{}阻塞中...", key);
                // 随机等待1-100us,防止线程饥饿。即,当同时到达多个进程,只会有一个进程获得锁,其他的都用同样的频率进行尝试,
                // 后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                // 使用随机等待时间可以在一定程度上保证公平性
                long randomWait = RandomUtils.nextLong(1, 100);
                TimeUnit.MICROSECONDS.sleep(randomWait);
                wait -= randomWait;
            }
            log.error("{}等待处理超时", key);

        } catch (Throwable throwable) {
            log.error(key + "获取锁异常", throwable);
            throw new IntentionException(ErrorCode.Acquire_Lock_Error, throwable);
        }
        return false;
    }

    public void releaseLock() {
        try {
            String key = tl.get();
            // 如果key为空表示获取锁超时,直接return
            if (StringUtils.isEmpty(key)) {
                return;
            }
            if (redisLockCommand.del(key) == 1) {
                log.info("业务处理完毕,释放锁:{}", key);
                tl.remove();
            } else {
                log.error("释放锁异常, 删除结果不为1");
            }
        } catch (Exception e) {
            log.error("释放锁异常", e);
        }
    }
}