Redis实现分布式锁
模拟场景
库存表:
如图现有【sku-AE86】库存为100,模拟仓库系统库存扣减逻辑,每次请求库存-1
/**
* 没加锁的场景
* 故意阻塞 让多个线程去操作同一个资源,会导致资源安全问题
* 例如:
* 库存为100
* 线程一获取到库存为100,然后进行库存扣减操作或其他业务逻辑,还没来得及写回到数据库
* 此时线程二也来操作这个库存,也获取到库存为100,然后也进行库存扣减操作
* 然后线程一业务执行完毕,将修改后的库存提交的数据库 100-1=99
* 然后线程二也提交 100-1=99
* 此时进行了两次库存扣减逻辑,但是库存只减了一次
* @param
* @return com.zyf.zboot.common.result.R
* @author zhongyufeng
* @date 2022/2/9 16:19
*/
@GetMapping("/testLock1")
public R testLock1() {
SkuStock skuStock = skuStockService.getById("sku-AE86");
Long stockQuantity = skuStock.getStockQuantity();
stockQuantity -= 1;
skuStock.setStockQuantity(stockQuantity);
//故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
skuStockService.updateById(skuStock);
return R.ok();
}
遇到这种情况一般会选择使用 synchronized
加锁来保证数据安全性,但是 synchronized 只是JVM进程锁,分布式场景下部署多台服务,都不在一个进程甚至都不在一台服务器上。
使用Redis来实现分布式锁
第一版:通过redis加锁来解决上面的问题
/**
* 通过redis加锁来解决上面的问题
* @param
* @return com.zyf.zboot.common.result.R
* @author zhongyufeng
* @date 2022/2/9 21:49
*/
@GetMapping("/testLock2")
public R testLock2() {
String sku = "sku-AE86";
String lock = "LOCK";
//当没有锁的时候对ID加锁
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock);
//加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
if (!ifAbsent) {
log.error("该资源正在被操作");
return R.error("该资源正在被操作");
}
SkuStock skuStock = skuStockService.getById(sku);
Long stockQuantity = skuStock.getStockQuantity();
stockQuantity -= 1;
skuStock.setStockQuantity(stockQuantity);
//修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
skuStockService.updateById(skuStock);
//执行完成之后释放锁
redisTemplate.delete(sku);
return R.ok();
}
上面虽然加锁成功,也保证了线程一在执行的过程中,其他线程无法操作这个sku的库存,但是但是如果程序出现异常或者直接程序终止了,会造成死锁
第二版:通过在加锁时设置超时时间来解决死锁的问题,这样即使程序异常终止,也会释放锁
/**
* 问题:虽然加锁成功 但是如果程序出现异常或者直接程序终止了,会造成死锁
* 解决:通过在加锁时设置超时时间来解决死锁的问题,这样即使程序异常终止,也会释放锁
*
* @param
* @return com.zyf.zboot.common.result.R
* @author zhongyufeng
* @date 2022/2/9 22:08
*/
@GetMapping("/testLock3")
public R testLock3() {
String sku = "sku-AE86";
String lock = "LOCK";
//当没有锁的时候对ID加锁
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock);
//todo 解决方式:设置超时时间 5s
redisTemplate.expire(sku, 5, TimeUnit.SECONDS);
//加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
if (!ifAbsent) {
log.error("该资源正在被操作");
return R.error("该资源正在被操作");
}
SkuStock skuStock = skuStockService.getById(sku);
Long stockQuantity = skuStock.getStockQuantity();
stockQuantity -= 1;
skuStock.setStockQuantity(stockQuantity);
//修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
skuStockService.updateById(skuStock);
//todo 问题点:模拟异常情况,在释放锁之前程序终止,锁没有被释放,其他线程永远无法操作这条资源,造成了死锁
//这里使用 try catch finally 也没有用,因为不只是异常会终止程序,比如断电、服务器故障等问题是程序避免不了的
System.out.println(1 / 0);
//执行完成之后释放锁
redisTemplate.delete(sku);
return R.ok();
}
虽然实现了超时自动解锁,但是还是有问题,上面加锁和设置超时时间是两行代码,极端情况下,可能会出现在上锁之后 在设置超时时间之前程序终止,这样也会导致死锁
第三版:通过redis 原子性的指令 解决上面的问题
/**
* 问题:如果上锁之后 在设置超时时间之前程序终止,也会导致死锁
* 解决:通过redis 原子性的指令 解决上面的问题
*
* @param
* @return com.zyf.zboot.common.result.R
* @author zhongyufeng
* @date 2022/2/9 22:26
*/
@GetMapping("/testLock4")
public R testLock4() {
String sku = "sku-AE86";
String lock = "LOCK";
//原版:
// //当没有锁的时候对ID加锁
// Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock);
//
// //todo 问题点:如果在设置超时时间之前程序终止,也会造成死锁
// System.out.println(1 / 0);
//
// //置超时时间 5s
// redisTemplate.expire(sku, 5, TimeUnit.SECONDS);
//todo:当没有锁的时候对ID加锁,并设置超时时间 5s,此操作保证了原子性,加锁成功时就设置了超时时间
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, lock, 5, TimeUnit.SECONDS);
//加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
if (!ifAbsent) {
log.error("该资源正在被操作");
return R.error("该资源正在被操作");
}
SkuStock skuStock = skuStockService.getById(sku);
Long stockQuantity = skuStock.getStockQuantity();
stockQuantity -= 1;
skuStock.setStockQuantity(stockQuantity);
//修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
skuStockService.updateById(skuStock);
//模拟异常情况,在释放锁之前程序终止,锁没有被释放,其他线程永远无法操作这条资源,造成了死锁
System.out.println(1 / 0);
//执行完成之后释放锁
redisTemplate.delete(sku);
return R.ok();
}
这样看似没有问题了,但实际上还是不够严谨,我们来假设这个场景:
1、线程一获取到锁,进行操作,但是5秒内没有执行完成,锁被自动释放
2、锁被释放后,线程二获取到锁,然后进行操作
3、当线程二还没操作完的时候,线程一执行完毕,代码执行锁释放操作,此时就造成了 线程一释放了线程二的锁
4、当线程二的锁被释放后,线程三获取到锁,然后进行操作
5、当线程三正在操作的时候,线程二又执行完成了,然后执行释放锁操作,此时就造成了 线程二释放了线程三的锁
6、依次类推。。。。一旦出现这种情况,在高并发的场景下,这个锁就完全失效了
第四版本:通过uuid的方式 在释放锁之前先判断是否为自己的锁,并使用lua脚本执行整个过程,保证原子性
/**
* 问题:
* 1、线程一获取到锁,进行操作,但是5秒内没有执行完成,锁被自动释放
* 2、锁被释放后,线程二获取到锁,然后进行操作
* 3、当线程二还没操作完的时候,线程一执行完毕,代码执行锁释放操作,此时就造成了 线程一释放了线程二的锁
* 4、当线程二的锁被释放后,线程三获取到锁,然后进行操作
* 5、当线程三正在操作的时候,线程二又执行完成了,然后执行释放锁操作,此时就造成了 线程二释放了线程三的锁
* 6、依次类推。。。。一旦出现这种情况,在高并发的场景下,这个锁就完全失效了、
*
* 解决:
* 方式1、通过uuid的方式 在释放锁之前先判断是否为自己的锁,并使用lua脚本执行整个过程,保证原子性
* 方式2、获取到锁之后,在执行的过程中,开启一个子线程,定时为锁续时长,定时周期推荐为超时时间的1/3
* @return
* @author zhongyufeng
* @date 2022/2/9 22:42
*/
@GetMapping("/testLock5")
public R testLock5() {
String sku = "sku-AE86";
String uuid = UUID.randomUUID().toString();
//当没有锁的时候对ID加锁,并设置超时时间 5s,此操作保证了原子性,加锁成功时就设置了超时时间
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(sku, uuid, 5, TimeUnit.SECONDS);
//加锁之后,其他线程操作这条数据时会直接提示该资源正在被操作
if (!ifAbsent) {
log.error("该资源正在被操作");
return R.error("该资源正在被操作");
}
SkuStock skuStock = skuStockService.getById(sku);
Long stockQuantity = skuStock.getStockQuantity();
stockQuantity -= 1;
skuStock.setStockQuantity(stockQuantity);
//修改之前 故意阻塞,模拟分布式多线程的情况,让其他线程操作这条数据
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
skuStockService.updateById(skuStock);
//模拟异常情况,在释放锁之前程序终止,锁没有被释放,其他线程永远无法操作这条资源,造成了死锁
// System.out.println(1 / 0);
//使用lua脚本解锁,解决了会接开别人锁的问题
unlockLua(sku, uuid);
return R.ok();
}
/**
* 使用lua脚本解锁,不会解除别人锁
*
* @return
*/
public boolean unlockLua(String key, String uuid) {
if (key == null || uuid == null) {
return false;
}
DefaultRedisScript<Long> redisScript = new DefaultRedisScript();
//用于解锁的lua脚本位置
redisScript.setLocation(new ClassPathResource("unlock.lua"));
redisScript.setResultType(Long.class);
//没有指定序列化方式,默认使用上面配置的
Object result = redisTemplate.execute(redisScript, Arrays.asList(key), uuid);
return result.equals(Long.valueOf(1));
}
lua脚本(放在resources下即可,文件名:unlock.lua)
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 奇怪的阿峰
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果