下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

时间:2023-03-08 21:39:33

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

上面的代码更新库存的数据,存在多线程的问题,第一种方法使用synchronized关键字修饰的语句块代码,但是性能较低,并且还是存在问题的

在分布式的场景下,当前库存系统部署在多个tomcat上,即使加了同步锁,也会存在问题,一个线程访问tomcat1,另外一个线程同时访问tomcat2,两个都是进行减少库存操作也是存在问题的,synchronized同步不能跨jvm

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

上面的代码在一个jvm进程下面解决多线程是没有问题的,但是在分布式环境下部署多个tomcat下部署多个库存微服务,使用synchronized是存在问题的

我们可以使用下面的架构来进行测试,测试上面的代码不正确

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

nginx负载代码后面的两个tomcat

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

8080和8090就是nginx反向代理两个tomcat

nginx的配置如下:

#user  nobody;
worker_processes 1; #error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info; #pid logs/nginx.pid; events {
worker_connections 1024;
} http {
include mime.types;
default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main;
log_format json '{"@timestamp":"$time_iso8601",'
'"remote_addr":"$remote_addr",'
'"status":$status,'
'"bodysize":$body_bytes_sent,'
'"referer":"$http_referer",'
'"ua":"$http_user_agent",'
'"handletime":$request_time,'
'"url":"$uri"}';
access_log logs/access.log;
access_log logs/access.json.log json; sendfile on;
#tcp_nopush on; #keepalive_timeout 0;
keepalive_timeout 65; #gzip on; #引入自定义配置文件
include reverse-procy.conf; upstream redislock{
server 127.0.0.1:6666 weight=1;
server 127.0.0.1:7777 weight=1;
} server {
listen 8088;
server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / {
root html;
index index.html index.htm;
proxy_pass http://redislock;
} #error_page 404 /404.html; # redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html; } # another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias; # location / {
# root html;
# index index.html index.htm;
# }
#} # HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost; # ssl_certificate cert.pem;
# ssl_certificate_key cert.key; # ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m; # ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on; # location / {
# root html;
# index index.html index.htm;
# }
#} }

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

第二个参数0表示同一时刻发起200次请求,配置成5,表示在5秒内会总共发器200次请求 ,配置成1,表示1秒钟之内发起200次请求

在端口为7777这台机器上打印结果如下

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

在端口为6666这台机器上打印结果如下

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

我们来看下上面具体的代码配置

两台机器上存在相同的记录,说明存在两台机器对同一个库存进行操作的情况,说明上面的代码存在问题,我们可以使用redis的senx命令来解决

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

分布式锁要注意解决下面的几个问题:

1、释放锁其实只需要把锁的key删除即可,使用del xxx指令。不过,仔细思考,如果在我们执行del之前,
服务突然宕机,那么锁岂不是永远无法删除了?!
为了避免因服务宕机引起锁无法释放问题,我们可以在获取锁的时候,给锁加一个有效时间,当时间超
出时,就会自动释放锁,这样就不会死锁了

并且要保证锁被删除,要放在try  finally中

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

步骤如下:
1、通过set命令设置锁
2、判断返回结果是否是OK
1)Nil,获取失败,结束或重试(自旋锁)
2)OK,获取锁成功
执行业务
释放锁,DEL 删除key即可
3、异常情况,服务宕机。超时时间EX结束,会自动释放锁

2、大家思考一下,释放锁就是用DEL语句把锁对应的key给删除,有没有这么一种可能性:
1. 3个进程:A和B和C,在执行任务,并争抢锁,此时A获取了锁,并设置自动过期时间为10s
2. A开始执行业务,因为某种原因,业务阻塞,耗时超过了10秒,此时锁自动释放了
3. B恰好此时开始尝试获取锁,因为锁已经自动释放,成功获取锁
4. A此时业务执行完毕,执行释放锁逻辑(删除key),于是B的锁被释放了,而B其实还在执行业务
5. 此时进程C尝试获取锁,也成功了,因为A把B的锁删除了。
问题出现了:B和C同时获取了锁,违反了互斥性!
如何解决这个问题呢?我们应该在删除锁之前,判断这个锁是否是自己设置的锁,如果不是(例如自己
的锁已经超时释放),那么就不要删除了。
那么问题来了:如何得知当前获取锁的是不是自己呢?
我们可以在set 锁时,存入当前线程的唯一标识!删除锁前,判断下里面的值是不是与自己标识释放一
致,如果不一致,说明不是自己的锁,就不要删除了。 这里通过线程id来实现

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

package com.itheima.security.distributed.uaa;

import java.util.UUID;
import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; /**
* @author Administrator
* @version 1.0
**/ @RestController
public class OrderController { @Autowired
Environment environment; @Autowired
StringRedisTemplate redisTemplate; @GetMapping(value = "/deduce_stock")
public String deduce_stock(){
String thread_id = UUID.randomUUID().toString();
String product_id = "001"; try{
//商品名称,对同一个商品进行减少库存的操作
Boolean setIfAbsent = redisTemplate.opsForValue().setIfAbsent(product_id,thread_id, 10, TimeUnit.MINUTES);
if(!setIfAbsent){ return "库存正在被操作,请稍等";
}
int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock > 0){
int real_stock = stock -1;
redisTemplate.opsForValue().set("stock", real_stock+"");
System.out.println(environment.getProperty("local.server.port")+"扣减库存成功,剩余库存为:"+real_stock); }else{
System.out.println("扣减库存失败,库存不足:");
} }finally{ //释放锁,不是自己的锁,不能删除掉
if(thread_id.equalsIgnoreCase(redisTemplate.opsForValue().get(product_id))){
System.out.println(environment.getProperty("local.server.port")+"扣减库存成功,释放锁:");
redisTemplate.delete(product_id);
} } return "end";
} @RequestMapping(value = "/aa")
public String deduce_stock1(){ return "end" +environment.getProperty("local.server.port");
} }

上面的问题还存在一个问题,上面的锁不是可重入锁,如果我们在获取锁以后,执行代码的过程中,再次尝试获取锁,执行setnx肯定会失败,因为锁已经存在
了。这样有可能导致死锁,这样的锁就是不可重入的。
如何解决呢?
当然是想办法改造成可重入锁。
3.4.1.重入锁
什么叫做可重入锁呢?
可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获
取到该锁。换一种说法:同一个线程再次进入同步代码时,可以使用自己已获取到的锁。
可重入锁可以避免因同一线程中多次获取锁而导致死锁发生。

那么,如何实现可重入锁呢?
获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取,
而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果锁进入了多次,在最内层直接删除锁,
导致外部的业务在没有锁的情况下执行,会有安全问题。因此必须获取锁时累计重入的次数,释
放时则减去重入次数,如果减到0,则可以删除锁.
因此,存储在锁中的信息就必须包含:key、线程标识、重入次数。不能再使用简单的key-value结构,
这里推荐使用hash结构:
key:lock
hashKey:线程信息
hashValue:重入次数,默认1

可以使用hset命令

需要用到的一些Redis命令包括:
EXISTS key:判断一个Key是否存在
HEXISTS key field:判断一个hash的field是否存在
HSET key field value :给一个hash的field设置一个值
HINCRBY key field increment:给一个hash的field值增加指定数值
EXPIRE key seconds:给一个key设置过期时间
DEL key:删除指定key

下面我们假设锁的key为“ lock ”,hashKey是当前线程的id:“ threadId ”,锁自动释放时间假设为20
获取锁的步骤:
1、判断lock是否存在 EXISTS lock
存在,说明有人获取锁了,下面判断是不是自己的锁
判断当前线程id作为hashKey是否存在: HEXISTS lock threadId
不存在,说明锁已经有了,且不是自己获取的,锁获取失败,end
存在,说明是自己获取的锁,重入次数+1: HINCRBY lock threadId 1 ,去到步骤3
2、不存在,说明可以获取锁, HSET key threadId 1
3、设置锁自动释放时间, EXPIRE lock 20
释放锁的步骤:
1、判断当前线程id作为hashKey是否存在: HEXISTS lock threadId
不存在,说明锁已经失效,不用管了
存在,说明锁还在,重入次数减1: HINCRBY lock threadId -1 ,获取新的重入次数
2、判断重入次数是否为0:
为0,说明锁全部释放,删除key: DEL lock

大于0,说明锁还在使用,重置有效时间: EXPIRE lock 20

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

面探讨的Redis锁实现方案都忽略了一个非常重要的问题:原子性问题。无论是获取锁,还是释放锁
的过程,都是有多行Redis指令来完成的,如果不能保证这些Redis命令执行的原子性,则整个过程都是
不安全的。
而Redis中支持以Lua脚本来运行多行命令,并且保证整个脚本运行的原子性。
接下来,我们分几块来学习Lua脚本的使用:
Redis中如何执行Lua脚本
Lua脚本的基本语法
编写上述分布式锁对应的Lua脚本

redis使用lua脚本

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

我们来看下整个工程的代码

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

lock.lua

local key = KEYS[1]
local threadId = ARGV[1]
local releaseTime = ARGV[2] if(redis.call('exists', key) == 0)
then
redis.call('hset', key, threadId, '1')
redis.call('expire', key, releaseTime)
return 1
end if(redis.call('hexists', key, threadId) == 1)
then
redis.call('hincrby', key, threadId, '1')
redis.call('expire', key, releaseTime)
return 1
end
return 0

unlock.lua

local key = KEYS[1]
local threadId = ARGV[1]
local releaseTime = ARGV[2] if (redis.call('HEXISTS', key, threadId) == 0) then
return nil
end
local count = redis.call('HINCRBY', key, threadId, -1) if (count > 0) then
redis.call('EXPIRE', key, releaseTime)
return nil
else
redis.call('DEL', key)
return nil
end

注意lua脚本中不能存在中文

RedisLock.java

package cn.itcast.demo.lock;

/**
* @author 虎哥
*/
public interface RedisLock {
/**
* 获取锁
* @param releaseTime 锁的自动释放时间
* @return 获取锁是否成功
*/
boolean tryLock(long releaseTime); /**
* 释放锁
*/
void unlock();
}

ReentrantRedisLock.java

package cn.itcast.demo.lock;

import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import java.util.Collections;
import java.util.UUID; public class ReentrantRedisLock implements RedisLock { private StringRedisTemplate redisTemplate;
/**
* 设定好锁对应的 key
*/
private String key; /**
* 存入的线程信息的前缀,防止与其它JVM中线程信息冲突
*/
private final String ID_PREFIX = UUID.randomUUID().toString(); public ReentrantRedisLock(StringRedisTemplate redisTemplate, String key) {
this.redisTemplate = redisTemplate;
this.key = key;
} private static final DefaultRedisScript<Long> LOCK_SCRIPT;
private static final DefaultRedisScript<Object> UNLOCK_SCRIPT;
static {
// 加载释放锁的脚本
LOCK_SCRIPT = new DefaultRedisScript<>();
LOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
LOCK_SCRIPT.setResultType(Long.class); // 加载释放锁的脚本
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
}
// 锁释放时间
private String releaseTime; public boolean tryLock(long releaseTime) {
// 记录释放时间
this.releaseTime = String.valueOf(releaseTime);
// 执行脚本
Long result = redisTemplate.execute(
LOCK_SCRIPT,
Collections.singletonList(key),
ID_PREFIX + Thread.currentThread().getId(), this.releaseTime); // 判断结果
return result != null && result.intValue() == 1;
} @Override
public void unlock() {
// 执行脚本
redisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(key),
ID_PREFIX + Thread.currentThread().getId(), this.releaseTime);
}
}

RedisLockFactory

package cn.itcast.demo.lock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; /**
* @author 虎哥
*/
@Component
public class RedisLockFactory { @Autowired
private StringRedisTemplate redisTemplate; public RedisLock getReentrantLock(String key){
return new ReentrantRedisLock(redisTemplate, key);
}
}

我们通过Spring提供的RedisTemplate来操作lua脚本, RedisTemplate 中提供了一个方法,用来执行Lua脚本:

我们定义一个定时任务,模拟清理订单的任务:
OrderController

package cn.itcast.demo.lock;

import java.util.UUID;
import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; /**
* @author Administrator
* @version 1.0
**/ @RestController
public class OrderController { @Autowired
Environment environment; @Autowired
private RedisLockFactory factory; @GetMapping(value = "/deduce_stock")
public String deduce_stock() throws InterruptedException{ // 获取锁对象
RedisLock lock = factory.getReentrantLock("lock1");
// 尝试获取锁
boolean isLock = lock.tryLock(50);
if(!isLock){
// 获取锁失败
return "error";
} try{ clearOrder(); }finally{ lock.unlock(); } return "end";
} @RequestMapping(value = "/aa")
public String deduce_stock1(){ return "end" +environment.getProperty("local.server.port");
} public void clearOrder() throws InterruptedException {
System.out.println("开始清理订单");
Thread.sleep(500);
System.out.println("开始恢复库存");
}
}

上面的代码已经解决了分布式锁的问题,但是在集群的环境下还存在问题

单点的redis无法保证高可用,因此一般我们都会给redis搭建主从集群。但是,主从集群无法保证分布式
锁的高可用特性。
在Redis官网上,也对这种单点故障做了说明:
在这种场景(主从结构)中存在明显的竞态:
1. 客户端A从master获取到锁
2. 在master将锁同步到slave之前,master宕掉了。
3. slave节点被晋级为master节点
4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!
有时候程序就是这么巧,比如说正好一个节点挂掉的时候,多个客户端同时取到了锁。如果你可以
接受这种小概率错误,那用这个基于复制的方案就完全没有问题。
因此,Redis的作者又给出了一种新的算法来解决整个高可用问题,即Redlock算法,摘抄了算法的介绍
如下: 我们可以采用看门狗(watch dog)解决锁超时问题,/开启一个任务,这个任务在 获取锁之后10秒后,重
新向redis发起请求,重置有效期,重新执行expire

3.7.Redission
如果按照Redlock算法来实现分布式锁,加上各种安全控制,代码会比较复杂。而开源的Redission框架
就帮我们实现了各种基于Redis的分布式锁,包括Redlock锁。

1)依赖
使用起来非常方便,首先引入依赖:
2)配置
然后通过Java配置的方式,设置Redis的地址,构建RedissionClient客户端:

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.6</version>
</dependency>

2)配置
然后通过Java配置的方式,设置Redis的地址,构建RedissionClient客户端:

/**
* @author 虎哥
*/
@Configuration3)常用API介绍:
RedissClient中定义了常见的锁:
获取锁对象后,可以通过 tryLock() 方法获取锁:
有3个重载的方法,可以控制锁是否需要重试来获取:
三个参数:获取锁,设置锁等待时间 waitTime 、释放时间 leaseTime ,时间单位 unit 。
如果获取锁失败后,会在 waitTime 减去获取锁用时的剩余时间段内继续尝试获取锁,如果依
然获取失败,则认为获取锁失败;
获取锁后,如果超过 leaseTime 未释放,为避免死锁会自动释放。
两个参数:获取锁,设置锁等待时间 time 、时间单位 unit 。释放时间 leaseTime 按照默认的30s
空参:获取锁, waitTime 默认0s,即获取锁失败不重试, leaseTime 默认30s
任务执行完毕,使用 unlock() 方法释放锁:
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地

config.useSingleServer()
.setAddress("redis://192.168.150.101:6379");
// 创建客户端
return Redisson.create(config);
}
}

4)完整案例
使用Redission来代替我们之前自定义锁的测试案例:

LockDemoApplication

package cn.itcast.demo;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling
@SpringBootApplication
public class LockDemoApplication { public static void main(String[] args) {
SpringApplication.run(LockDemoApplication.class, args);
} public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地
config.useSingleServer() .setAddress("redis://127.0.0.1:6379");
// 创建客户端
return Redisson.create(config);
}
} }

OrderController

package cn.itcast.demo.lock;

import java.util.UUID;
import java.util.concurrent.TimeUnit; import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; /**
* @author Administrator
* @version 1.0
**/ @RestController
public class OrderController { @Autowired
Environment environment; @Autowired
RedissonClient redissonClient; @Autowired
private RedisLockFactory factory; @GetMapping(value = "/deduce_stock")
public String deduce_stock() throws InterruptedException{ // 获取锁对象
RLock lock = redissonClient.getLock("lock");
// 尝试获取锁
boolean isLock = lock.tryLock();
if(!isLock){
// 获取锁失败
return "error";
} try{ clearOrder(); }finally{ lock.unlock(); } return "end";
} @RequestMapping(value = "/aa")
public String deduce_stock1(){ return "end" +environment.getProperty("local.server.port");
} public void clearOrder() throws InterruptedException {
System.out.println("开始清理订单");
Thread.sleep(500);
System.out.println("开始恢复库存");
}
}

上面的代码就能够解决在集群环境下分布式锁失效的问题

pom.xml文件如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<groupId>cn.itcast.demo</groupId>
<artifactId>lock-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>lock-demo</name>
<description>Demo project for Spring Boot</description> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.6</version>
</dependency> <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build> </project>

第二种解决方案,使用数据库的行级别所来解决

下订单更新订单表然后减少库存表中的数据,出现库存超卖,使用数据库和redis坚决库存超卖的问题

下面我们假设锁的key为“ lock ”,hashKey是当前线程的id:“ threadId ”,锁自动释放时间假设为20
获取锁的步骤:
1、判断lock是否存在 EXISTS lock
存在,说明有人获取锁了,下面判断是不是自己的锁
判断当前线程id作为hashKey是否存在: HEXISTS lock threadId
不存在,说明锁已经有了,且不是自己获取的,锁获取失败,end
存在,说明是自己获取的锁,重入次数+1: HINCRBY lock threadId 1 ,去到步骤3
2、不存在,说明可以获取锁, HSET key threadId 1
3、设置锁自动释放时间, EXPIRE lock 20
释放锁的步骤:
1、判断当前线程id作为hashKey是否存在: HEXISTS lock threadId
不存在,说明锁已经失效,不用管了
存在,说明锁还在,重入次数减1: HINCRBY lock threadId -1 ,获取新的重入次数
2、判断重入次数是否为0:
为0,说明锁全部释放,删除key: DEL lock

序号 命令及描述
1 EVAL script numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
2 EVALSHA sha1 numkeys key [key ...] arg [arg ...] 执行 Lua 脚本。
3 SCRIPT EXISTS script [script ...] 查看指定的脚本是否已经被保存在缓存当中。
4 SCRIPT FLUSH 从脚本缓存中移除所有脚本。
5 SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
6 SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

大于0,说明锁还在使用,重置有效时间: EXPIRE lock 20