模拟场景

库存表:

库存表.png

如图现有【sku-AE86】库存为100,模拟仓库系统库存扣减逻辑,每次请求库存-1

    /**
     * 没加锁的场景
     * 故意阻塞 让多个线程去操作同一个资源,会导致资源安全问题
     * 例如:
     *  库存为100
     *  线程一获取到库存为100,然后进行库存扣减操作或其他业务逻辑,还没来得及写回到数据库
     *  此时线程二也来操作这个库存,也获取到库存为100,然后也进行库存扣减操作
     *  然后线程一业务执行完毕,将修改后的库存提交的数据库 100-1=99
     *  然后线程二也提交 100-1=99
     *  此时进行了两次库存扣减逻辑,但是库存只减了一次
     * @param
     * @return com.zyf.zboot.common.result.R
     * @author zhongyufeng
     * @date 2022/2/9 16:19
     */
    @GetMapping("/testLock1")
    public R testLock1() {

        SkuStock skuStock = skuStockService.getById("sku-AE86");

        Long stockQuantity = skuStock.getStockQuantity();
        stockQuantity -= 1;

        skuStock.setStockQuantity(stockQuantity);

        //故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        skuStockService.updateById(skuStock);
        return R.ok();
    }

遇到这种情况一般会选择使用 synchronized 加锁来保证数据安全性,但是 synchronized 只是JVM进程锁,分布式场景下部署多台服务,都不在一个进程甚至都不在一台服务器上。



使用Redis来实现分布式锁

第一版:通过redis加锁来解决上面的问题

    /**
     * 通过redis加锁来解决上面的问题
     * @param
     * @return com.zyf.zboot.common.result.R
     * @author zhongyufeng
     * @date 2022/2/9 21:49
     */
    @GetMapping("/testLock2")
    public R testLock2() {

        String sku = "sku-AE86";
        String lock = "LOCK";

        //当没有锁的时候对ID加锁
        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock);

        //加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
        if (!ifAbsent) {
            log.error("该资源正在被操作");
            return R.error("该资源正在被操作");
        }

        SkuStock skuStock = skuStockService.getById(sku);

        Long stockQuantity = skuStock.getStockQuantity();
        stockQuantity -= 1;
        skuStock.setStockQuantity(stockQuantity);

        //修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        skuStockService.updateById(skuStock);

        //执行完成之后释放锁
        redisTemplate.delete(sku);

        return R.ok();
    }

上面虽然加锁成功,也保证了线程一在执行的过程中,其他线程无法操作这个sku的库存,但是但是如果程序出现异常或者直接程序终止了,会造成死锁


第二版:通过在加锁时设置超时时间来解决死锁的问题,这样即使程序异常终止,也会释放锁

    /**
     * 问题:虽然加锁成功 但是如果程序出现异常或者直接程序终止了,会造成死锁
     * 解决:通过在加锁时设置超时时间来解决死锁的问题,这样即使程序异常终止,也会释放锁
     *
     * @param
     * @return com.zyf.zboot.common.result.R
     * @author zhongyufeng
     * @date 2022/2/9 22:08
     */
    @GetMapping("/testLock3")
    public R testLock3() {

        String sku = "sku-AE86";
        String lock = "LOCK";

        //当没有锁的时候对ID加锁
        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock);

        //todo 解决方式:设置超时时间 5s
        redisTemplate.expire(sku, 5, TimeUnit.SECONDS);

        //加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
        if (!ifAbsent) {
            log.error("该资源正在被操作");
            return R.error("该资源正在被操作");
        }

        SkuStock skuStock = skuStockService.getById(sku);

        Long stockQuantity = skuStock.getStockQuantity();
        stockQuantity -= 1;
        skuStock.setStockQuantity(stockQuantity);

        //修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        skuStockService.updateById(skuStock);

        //todo 问题点:模拟异常情况,在释放锁之前程序终止,锁没有被释放,其他线程永远无法操作这条资源,造成了死锁
        //这里使用 try catch finally 也没有用,因为不只是异常会终止程序,比如断电、服务器故障等问题是程序避免不了的
        System.out.println(1 / 0);

        //执行完成之后释放锁
        redisTemplate.delete(sku);

        return R.ok();
    }

虽然实现了超时自动解锁,但是还是有问题,上面加锁和设置超时时间是两行代码,极端情况下,可能会出现在上锁之后 在设置超时时间之前程序终止,这样也会导致死锁


第三版:通过redis 原子性的指令 解决上面的问题

    /**
     * 问题:如果上锁之后 在设置超时时间之前程序终止,也会导致死锁
     * 解决:通过redis 原子性的指令 解决上面的问题
     *
     * @param
     * @return com.zyf.zboot.common.result.R
     * @author zhongyufeng
     * @date 2022/2/9 22:26
     */
    @GetMapping("/testLock4")
    public R testLock4() {

        String sku = "sku-AE86";
        String lock = "LOCK";


        //原版:
//        //当没有锁的时候对ID加锁
//        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock);
//
//        //todo 问题点:如果在设置超时时间之前程序终止,也会造成死锁
//        System.out.println(1 / 0);
//
//        //置超时时间 5s
//        redisTemplate.expire(sku, 5, TimeUnit.SECONDS);


        //todo:当没有锁的时候对ID加锁,并设置超时时间 5s,此操作保证了原子性,加锁成功时就设置了超时时间
        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock, 5, TimeUnit.SECONDS);

        //加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
        if (!ifAbsent) {
            log.error("该资源正在被操作");
            return R.error("该资源正在被操作");
        }

        SkuStock skuStock = skuStockService.getById(sku);

        Long stockQuantity = skuStock.getStockQuantity();
        stockQuantity -= 1;
        skuStock.setStockQuantity(stockQuantity);

        //修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        skuStockService.updateById(skuStock);

        //模拟异常情况,在释放锁之前程序终止,锁没有被释放,其他线程永远无法操作这条资源,造成了死锁
        System.out.println(1 / 0);

        //执行完成之后释放锁
        redisTemplate.delete(sku);

        return R.ok();
    }

这样看似没有问题了,但实际上还是不够严谨,我们来假设这个场景:
1、线程一获取到锁,进行操作,但是5秒内没有执行完成,锁被自动释放
2、锁被释放后,线程二获取到锁,然后进行操作
3、当线程二还没操作完的时候,线程一执行完毕,代码执行锁释放操作,此时就造成了 线程一释放了线程二的锁
4、当线程二的锁被释放后,线程三获取到锁,然后进行操作
5、当线程三正在操作的时候,线程二又执行完成了,然后执行释放锁操作,此时就造成了 线程二释放了线程三的锁
6、依次类推。。。。一旦出现这种情况,在高并发的场景下,这个锁就完全失效了


第四版本:通过uuid的方式 在释放锁之前先判断是否为自己的锁,并使用lua脚本执行整个过程,保证原子性

    /**
     * 问题:
     *  1、线程一获取到锁,进行操作,但是5秒内没有执行完成,锁被自动释放
     *  2、锁被释放后,线程二获取到锁,然后进行操作
     *  3、当线程二还没操作完的时候,线程一执行完毕,代码执行锁释放操作,此时就造成了 线程一释放了线程二的锁
     *  4、当线程二的锁被释放后,线程三获取到锁,然后进行操作
     *  5、当线程三正在操作的时候,线程二又执行完成了,然后执行释放锁操作,此时就造成了 线程二释放了线程三的锁
     *  6、依次类推。。。。一旦出现这种情况,在高并发的场景下,这个锁就完全失效了、
     *
     * 解决:
     *  方式1、通过uuid的方式 在释放锁之前先判断是否为自己的锁,并使用lua脚本执行整个过程,保证原子性
     *  方式2、获取到锁之后,在执行的过程中,开启一个子线程,定时为锁续时长,定时周期推荐为超时时间的1/3
     * @return
     * @author zhongyufeng
     * @date 2022/2/9 22:42
     */
    @GetMapping("/testLock5")
    public R testLock5() {

        String sku = "sku-AE86";
        String uuid = UUID.randomUUID().toString();

        //当没有锁的时候对ID加锁,并设置超时时间 5s,此操作保证了原子性,加锁成功时就设置了超时时间
        Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, uuid, 5, TimeUnit.SECONDS);

        //加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
        if (!ifAbsent) {
            log.error("该资源正在被操作");
            return R.error("该资源正在被操作");
        }

        SkuStock skuStock = skuStockService.getById(sku);

        Long stockQuantity = skuStock.getStockQuantity();
        stockQuantity -= 1;
        skuStock.setStockQuantity(stockQuantity);

        //修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        skuStockService.updateById(skuStock);

        //模拟异常情况,在释放锁之前程序终止,锁没有被释放,其他线程永远无法操作这条资源,造成了死锁
//        System.out.println(1 / 0);

        //使用lua脚本解锁,解决了会接开别人锁的问题
        unlockLua(sku, uuid);

        return R.ok();
    }
    /**
     * 使用lua脚本解锁,不会解除别人锁
     *
     * @return
     */
    public boolean unlockLua(String key, String uuid) {

        if (key == null || uuid == null) {
            return false;
        }
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript();
        //用于解锁的lua脚本位置
        redisScript.setLocation(new ClassPathResource("unlock.lua"));
        redisScript.setResultType(Long.class);
        //没有指定序列化方式,默认使用上面配置的
        Object result = redisTemplate.execute(redisScript, Arrays.asList(key), uuid);
        return result.equals(Long.valueOf(1));
    }

lua脚本(放在resources下即可,文件名:unlock.lua)

if redis.call('get',KEYS[1]) == ARGV[1] then
    return redis.call('del',KEYS[1])
else
    return 0
end