基于redis 实现分布式锁(二)

时间:2023-03-09 03:09:21
基于redis 实现分布式锁(二)

https://blog.****.net/xiaolyuh123/article/details/78551345

分布式锁的解决方式

  1. 基于数据库表做乐观锁,用于分布式锁。(适用于小并发)
  2. 使用memcached的add()方法,用于分布式锁。
  3. 使用memcached的cas()方法,用于分布式锁。(不常用)
  4. 使用redis的setnx()、expire()方法,用于分布式锁。
  5. 使用redis的setnx()、get()、getset()方法,用于分布式锁。
  6. 使用redis的watch、multi、exec命令,用于分布式锁。(不常用)
  7. 使用zookeeper,用于分布式锁。(不常用)

这里主要介绍第四种和第五种:

前文提供的两种方式其实都有些问题,要么是死锁,要么是依赖服务器时间同步。从Redis 2.6.12 版本开始, SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果。这样我们的可以将加锁操作用一个set命令来实现,直接是原子性操作,既没有死锁的风险,也不依赖服务器时间同步,可以完美解决这两个问题。
在redis文档上有详细说明:
http://doc.redisfans.com/string/set.html

使用redis的SET resource-name anystring NX EX max-lock-time 方式,用于分布式锁

原理

命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。

客户端执行以上的命令:

  • 如果服务器返回 OK ,那么这个客户端获得锁。
  • 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
  • 设置的过期时间到达之后,锁将自动释放。

可以通过以下修改,让这个锁实现更健壮:

  • 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
  • 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
    这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。

以下是一个简单的解锁脚本示例:

  1. if redis.call("get",KEYS[1]) == ARGV[1]
  2. then
  3. return redis.call("del",KEYS[1])
  4. else
  5. return 0
  6. end

可能存在的问题

占时没发现

具体实现

锁具体实现RedisLock:

  1. package com.xiaolyuh.lock;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.dao.DataAccessException;
  5. import org.springframework.data.redis.connection.RedisConnection;
  6. import org.springframework.data.redis.core.RedisCallback;
  7. import org.springframework.data.redis.core.StringRedisTemplate;
  8. import org.springframework.data.redis.core.script.RedisScript;
  9. import org.springframework.util.Assert;
  10. import org.springframework.util.StringUtils;
  11. import redis.clients.jedis.Jedis;
  12. import redis.clients.jedis.JedisCluster;
  13. import redis.clients.jedis.Protocol;
  14. import redis.clients.util.SafeEncoder;
  15. import java.util.ArrayList;
  16. import java.util.List;
  17. import java.util.Random;
  18. import java.util.UUID;
  19. /**
  20. * Redis分布式锁
  21. * 使用 SET resource-name anystring NX EX max-lock-time 实现
  22. * <p>
  23. * 该方案在 Redis 官方 SET 命令页有详细介绍。
  24. * http://doc.redisfans.com/string/set.html
  25. * <p>
  26. * 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性,
  27. * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中:
  28. * <p>
  29. * EX seconds — 以秒为单位设置 key 的过期时间;
  30. * PX milliseconds — 以毫秒为单位设置 key 的过期时间;
  31. * NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
  32. * XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。
  33. * <p>
  34. * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
  35. * <p>
  36. * 客户端执行以上的命令:
  37. * <p>
  38. * 如果服务器返回 OK ,那么这个客户端获得锁。
  39. * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
  40. *
  41. * @author yuhao.wangwang
  42. * @version 1.0
  43. * @date 2017年11月3日 上午10:21:27
  44. */
  45. public class RedisLock3 {
  46. private static Logger logger = LoggerFactory.getLogger(RedisLock3.class);
  47. private StringRedisTemplate redisTemplate;
  48. /**
  49. * 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
  50. */
  51. public static final String NX = "NX";
  52. /**
  53. * seconds — 以秒为单位设置 key 的过期时间,等效于EXPIRE key seconds
  54. */
  55. public static final String EX = "EX";
  56. /**
  57. * 调用set后的返回值
  58. */
  59. public static final String OK = "OK";
  60. /**
  61. * 默认请求锁的超时时间(ms 毫秒)
  62. */
  63. private static final long TIME_OUT = 100;
  64. /**
  65. * 默认锁的有效时间(s)
  66. */
  67. public static final int EXPIRE = 60;
  68. /**
  69. * 解锁的lua脚本
  70. */
  71. public static final String UNLOCK_LUA;
  72. static {
  73. StringBuilder sb = new StringBuilder();
  74. sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
  75. sb.append("then ");
  76. sb.append(" return redis.call(\"del\",KEYS[1]) ");
  77. sb.append("else ");
  78. sb.append(" return 0 ");
  79. sb.append("end ");
  80. UNLOCK_LUA = sb.toString();
  81. }
  82. /**
  83. * 锁标志对应的key
  84. */
  85. private String lockKey;
  86. /**
  87. * 记录到日志的锁标志对应的key
  88. */
  89. private String lockKeyLog = "";
  90. /**
  91. * 锁对应的值
  92. */
  93. private String lockValue;
  94. /**
  95. * 锁的有效时间(s)
  96. */
  97. private int expireTime = EXPIRE;
  98. /**
  99. * 请求锁的超时时间(ms)
  100. */
  101. private long timeOut = TIME_OUT;
  102. /**
  103. * 锁标记
  104. */
  105. private volatile boolean locked = false;
  106. final Random random = new Random();
  107. /**
  108. * 使用默认的锁过期时间和请求锁的超时时间
  109. *
  110. * @param redisTemplate
  111. * @param lockKey 锁的key(Redis的Key)
  112. */
  113. public RedisLock3(StringRedisTemplate redisTemplate, String lockKey) {
  114. this.redisTemplate = redisTemplate;
  115. this.lockKey = lockKey + "_lock";
  116. }
  117. /**
  118. * 使用默认的请求锁的超时时间,指定锁的过期时间
  119. *
  120. * @param redisTemplate
  121. * @param lockKey 锁的key(Redis的Key)
  122. * @param expireTime 锁的过期时间(单位:秒)
  123. */
  124. public RedisLock3(StringRedisTemplate redisTemplate, String lockKey, int expireTime) {
  125. this(redisTemplate, lockKey);
  126. this.expireTime = expireTime;
  127. }
  128. /**
  129. * 使用默认的锁的过期时间,指定请求锁的超时时间
  130. *
  131. * @param redisTemplate
  132. * @param lockKey 锁的key(Redis的Key)
  133. * @param timeOut 请求锁的超时时间(单位:毫秒)
  134. */
  135. public RedisLock3(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
  136. this(redisTemplate, lockKey);
  137. this.timeOut = timeOut;
  138. }
  139. /**
  140. * 锁的过期时间和请求锁的超时时间都是用指定的值
  141. *
  142. * @param redisTemplate
  143. * @param lockKey 锁的key(Redis的Key)
  144. * @param expireTime 锁的过期时间(单位:秒)
  145. * @param timeOut 请求锁的超时时间(单位:毫秒)
  146. */
  147. public RedisLock3(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
  148. this(redisTemplate, lockKey, expireTime);
  149. this.timeOut = timeOut;
  150. }
  151. /**
  152. * 尝试获取锁 超时返回
  153. *
  154. * @return
  155. */
  156. public boolean tryLock() {
  157. // 生成随机key
  158. lockValue = UUID.randomUUID().toString();
  159. // 请求锁超时时间,纳秒
  160. long timeout = timeOut * 1000000;
  161. // 系统当前时间,纳秒
  162. long nowTime = System.nanoTime();
  163. while ((System.nanoTime() - nowTime) < timeout) {
  164. if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
  165. locked = true;
  166. // 上锁成功结束请求
  167. return true;
  168. }
  169. // 每次请求等待一段时间
  170. seleep(10, 50000);
  171. }
  172. return locked;
  173. }
  174. /**
  175. * 尝试获取锁 立即返回
  176. *
  177. * @return 是否成功获得锁
  178. */
  179. public boolean lock() {
  180. lockValue = UUID.randomUUID().toString();
  181. //不存在则添加 且设置过期时间(单位ms)
  182. String result = set(lockKey, lockValue, expireTime);
  183. return OK.equalsIgnoreCase(result);
  184. }
  185. /**
  186. * 以阻塞方式的获取锁
  187. *
  188. * @return 是否成功获得锁
  189. */
  190. public boolean lockBlock() {
  191. lockValue = UUID.randomUUID().toString();
  192. while (true) {
  193. //不存在则添加 且设置过期时间(单位ms)
  194. String result = set(lockKey, lockValue, expireTime);
  195. if (OK.equalsIgnoreCase(result)) {
  196. return true;
  197. }
  198. // 每次请求等待一段时间
  199. seleep(10, 50000);
  200. }
  201. }
  202. /**
  203. * 解锁
  204. * <p>
  205. * 可以通过以下修改,让这个锁实现更健壮:
  206. * <p>
  207. * 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
  208. * 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
  209. * 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
  210. */
  211. public Boolean unlock() {
  212. // 只有加锁成功并且锁还有效才去释放锁
  213. // 只有加锁成功并且锁还有效才去释放锁
  214. if (locked) {
  215. return redisTemplate.execute(new RedisCallback<Boolean>() {
  216. @Override
  217. public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
  218. Object nativeConnection = connection.getNativeConnection();
  219. Long result = 0L;
  220. List<String> keys = new ArrayList<>();
  221. keys.add(lockKey);
  222. List<String> values = new ArrayList<>();
  223. values.add(lockValue);
  224. // 集群模式
  225. if (nativeConnection instanceof JedisCluster) {
  226. result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
  227. }
  228. // 单机模式
  229. if (nativeConnection instanceof Jedis) {
  230. result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
  231. }
  232. if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
  233. logger.info("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis());
  234. }
  235. locked = result == 0;
  236. return result == 1;
  237. }
  238. });
  239. }
  240. return true;
  241. }
  242. /**
  243. * 重写redisTemplate的set方法
  244. * <p>
  245. * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
  246. * <p>
  247. * 客户端执行以上的命令:
  248. * <p>
  249. * 如果服务器返回 OK ,那么这个客户端获得锁。
  250. * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
  251. *
  252. * @param key 锁的Key
  253. * @param value 锁里面的值
  254. * @param seconds 过去时间(秒)
  255. * @return
  256. */
  257. private String set(final String key, final String value, final long seconds) {
  258. Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空");
  259. return redisTemplate.execute(new RedisCallback<String>() {
  260. @Override
  261. public String doInRedis(RedisConnection connection) throws DataAccessException {
  262. Object nativeConnection = connection.getNativeConnection();
  263. String result = null;
  264. // 集群模式
  265. if (nativeConnection instanceof JedisCluster) {
  266. result = ((JedisCluster) nativeConnection).set(key, value, NX, EX, seconds);
  267. }
  268. // 单机模式
  269. if (nativeConnection instanceof Jedis) {
  270. result = ((Jedis) nativeConnection).set(key, value, NX, EX, seconds);
  271. }
  272. if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
  273. logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis());
  274. }
  275. return result;
  276. }
  277. });
  278. }
  279. /**
  280. * @param millis 毫秒
  281. * @param nanos 纳秒
  282. * @Title: seleep
  283. * @Description: 线程等待时间
  284. * @author yuhao.wang
  285. */
  286. private void seleep(long millis, int nanos) {
  287. try {
  288. Thread.sleep(millis, random.nextInt(nanos));
  289. } catch (InterruptedException e) {
  290. logger.info("获取分布式锁休眠被中断:", e);
  291. }
  292. }
  293. public String getLockKeyLog() {
  294. return lockKeyLog;
  295. }
  296. public void setLockKeyLog(String lockKeyLog) {
  297. this.lockKeyLog = lockKeyLog;
  298. }
  299. public int getExpireTime() {
  300. return expireTime;
  301. }
  302. public void setExpireTime(int expireTime) {
  303. this.expireTime = expireTime;
  304. }
  305. public long getTimeOut() {
  306. return timeOut;
  307. }
  308. public void setTimeOut(long timeOut) {
  309. this.timeOut = timeOut;
  310. }
  311. }

调用方式:

  1. public void redisLock3(int i) {
  2. RedisLock3 redisLock3 = new RedisLock3(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
  3. try {
  4. long now = System.currentTimeMillis();
  5. if (redisLock3.tryLock()) {
  6. logger.info("=" + (System.currentTimeMillis() - now));
  7. // TODO 获取到锁要执行的代码块
  8. logger.info("j:" + j++);
  9. } else {
  10. logger.info("k:" + k++);
  11. }
  12. } catch (Exception e) {
  13. logger.info(e.getMessage(), e);
  14. } finally {
  15. redisLock2.unlock();
  16. }
  17. }

对于这种种redis实现分布式锁的方案还是有一个问题:就是你获取锁后执行业务逻辑的代码只能在redis锁的有效时间之内,因为,redis的key到期后会自动清除,这个锁就算释放了。所以这个锁的有效时间一定要结合业务做好评估。

源码: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-data-redis-distributed-lock 工程

参考: