下面由redis教程栏目给大家介绍关于3种redis分布式锁的对比,希望对需要的朋友有所帮助!
我们通常使用的synchronized或者lock都是线程锁,对同一个jvm进程内的多个线程有效。因为锁的本质 是内存中存放一个标记,记录获取锁的线程是谁,这个标记对每个线程都可见。然而我们启动的多个订单服务,就是多个jvm,内存中的锁显然是不共享的,每个jvm进程都有自己的 锁,自然无法保证线程的互斥了,这个时候我们就需要使用到分布式锁了。常用的有三种解决方案:1.基于数据库实现 2.基于zookeeper的临时序列化节点实现 3.redis实现。本文我们介绍的就是redis的实现方式。
实现分布式锁要满足3点:多进程可见,互斥,可重入。
1) 多进程可见
redis本身就是基于jvm之外的,因此满足多进程可见的要求。
2) 互斥
即同一时间只能有一个进程获取锁标记,我们可以通过redis的setnx实现,只有第一次执行的才会成功并返回1,其它情况返回0。
释放锁
释放锁其实只需要把锁的key删除即可,使用del xxx指令。不过,如果在我们执行del之前,服务突然宕机,那么锁就永远无法删除了。所以我们可以通过setex 命令设置过期时间即可。
import java.util.uuid;import org.slf4j.logger;import org.slf4j.loggerfactory;import org.springframework.beans.factory.annotation.autowired;import org.springframework.stereotype.component;import redis.clients.jedis.jedis;import redis.clients.jedis.jedispool;/** * 第一种分布式锁 */@componentpublic class redisservice {private final logger log = loggerfactory.getlogger(this.getclass()); @autowired jedispool jedispool; // 获取锁之前的超时时间(获取锁的等待重试时间) private long acquiretimeout = 5000; // 获取锁之后的超时时间(防止死锁) private int timeout = 10000; /** * 获取分布式锁 * @return 锁标识 */ public boolean getredislock(string lockname,string val) { jedis jedis = null; try { jedis = jedispool.getresource(); // 1.计算获取锁的时间 long endtime = system.currenttimemillis() + acquiretimeout; // 2.尝试获取锁 while (system.currenttimemillis() < endtime) { // 3. 获取锁成功就设置过期时间 if (jedis.setnx(lockname, val) == 1) { jedis.expire(lockname, timeout/1000); return true; } } } catch (exception e) { log.error(e.getmessage()); } finally { returnresource(jedis); } return false; } /** * 释放分布式锁 * @param lockname 锁名称 */ public void unredislock(string lockname) { jedis jedis = null; try { jedis = jedispool.getresource(); // 释放锁 jedis.del(lockname); } catch (exception e) { log.error(e.getmessage()); } finally { returnresource(jedis); } }// =============================================== public string get(string key) { jedis jedis = null; string value = null; try { jedis = jedispool.getresource(); value = jedis.get(key); log.info(value); } catch (exception e) { log.error(e.getmessage()); } finally { returnresource(jedis); } return value; } public void set(string key, string value) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.set(key, value); } catch (exception e) { log.error(e.getmessage()); } finally { returnresource(jedis); } } /** * 关闭连接 */ public void returnresource(jedis jedis) { try { if(jedis!=null) jedis.close(); } catch (exception e) { } }}
上面的分布式锁实现了,但是这时候还可能出现另外2个问题:
一:获取锁时
setnx获取锁成功了,还没来得及setex服务就宕机了,由于这种非原子性的操作,死锁又发生了。其实redis提供了 nx 与 ex连用的命令。
二:释放锁时
1. 3个进程:a和b和c,在执行任务,并争抢锁,此时a获取了锁,并设置自动过期时间为10s
2. a开始执行业务,因为某种原因,业务阻塞,耗时超过了10秒,此时锁自动释放了
3. b恰好此时开始尝试获取锁,因为锁已经自动释放,成功获取锁
4. a此时业务执行完毕,执行释放锁逻辑(删除key),于是b的锁被释放了,而b其实还在执行业务
5. 此时进程c尝试获取锁,也成功了,因为a把b的锁删除了。
问题出现了:b和c同时获取了锁,违反了互斥性!如何解决这个问题呢?我们应该在删除锁之前,判断这个锁是否是自己设置的锁,如果不是(例如自己 的锁已经超时释放),那么就不要删除了。所以我们可以在set 锁时,存入当前线程的唯一标识!删除锁前,判断下里面的值是不是与自己标识释放一 致,如果不一致,说明不是自己的锁,就不要删除了。
/** * 第二种分布式锁 */public class redistool { private static final string lock_success = "ok"; private static final long release_success = 1l; /** * 尝试获取分布式锁 * @param jedis redis客户端 * @param lockkey 锁 * @param requestid 请求标识 * @param expiretime 超期时间 * @return 是否获取成功 */ public static boolean trygetdistributedlock(jedis jedis, string lockkey, string requestid, int expiretime) { string result = jedis.set(lockkey, requestid, "nx", "px", expiretime); if (lock_success.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param jedis redis客户端 * @param lockkey 锁 * @param requestid 请求标识 * @return 是否释放成功 */ public static boolean releasedistributedlock(jedis jedis, string lockkey, string requestid) { if (jedis.get(lockkey).equals(requestid)) { system.out.println("释放锁..." + thread.currentthread().getname() + ",identifiervalue:" + requestid); jedis.del(lockkey); return true; } return false; }}
按照上面方式实现分布式锁之后,就可以轻松解决大部分问题了。网上很多博客也都是这么实现的,但是仍然有些场景是不满足的,例如一个方法获取到锁之后,可能在方法内调这个方法此时就获取不到锁了。这个时候我们就需要把锁改进成可重入式锁了。
3) 重入锁:
也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。换一种说法:同一个线程再次进入同步代码时,可以使用自己已获取到的锁。可重入锁可以避免因同一线程中多次获取锁而导致死锁发生。像synchronized就是一个重入锁,它是通过moniter函数记录当前线程信息来实现的。实现可重入锁需要考虑两点:
获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取, 而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果锁进入了多次,在内层直接删除锁, 导致外部的业务在没有锁的情况下执行,会有安全问题。因此必须获取锁时累计重入的次数,释放时则减去重入次数,如果减到0,则可以删除锁。
下面我们假设锁的key为“ lock ”,hashkey是当前线程的id:“ threadid ”,锁自动释放时间假设为20获取锁的步骤: 1、判断lock是否存在 exists lock 2、不存在,则自己获取锁,记录重入层数为1. 2、存在,说明有人获取锁了,下面判断是不是自己的锁,即判断当前线程id作为hashkey是否存在:hexists lock threadid 3、不存在,说明锁已经有了,且不是自己获取的,锁获取失败. 3、存在,说明是自己获取的锁,重入次数+1: hincrby lock threadid 1 ,最后更新锁自动释放时间, expire lock 20 释放锁的步骤: 1、判断当前线程id作为hashkey是否存在: hexists lock threadid 2、不存在,说明锁已经失效,不用管了 2、存在,说明锁还在,重入次数减1: hincrby lock threadid -1 , 3、获取新的重入次数,判断重入次数是否为0,为0说明锁全部释放,删除key: del lock
因此,存储在锁中的信息就必须包含:key、线程标识、重入次数。不能再使用简单的key-value结构, 这里推荐使用hash结构。
获取锁的脚本(注释删掉,不然运行报错)
local key = keys[1]; -- 第1个参数,锁的keylocal threadid = argv[1]; -- 第2个参数,线程唯一标识local releasetime = argv[2]; -- 第3个参数,锁的自动释放时间if(redis.call('exists', key) == 0) then -- 判断锁是否已存在 redis.call('hset', key, threadid, '1'); -- 不存在, 则获取锁 redis.call('expire', key, releasetime); -- 设置有效期 return 1; -- 返回结果end;if(redis.call('hexists', key, threadid) == 1) then -- 锁已经存在,判断threadid是否是自己 redis.call('hincrby', key, threadid, '1'); -- 如果是自己,则重入次数+1 redis.call('expire', key, releasetime); -- 设置有效期 return 1; -- 返回结果end;return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的脚本(注释删掉,不然运行报错)
local key = keys[1]; -- 第1个参数,锁的keylocal threadid = argv[1]; -- 第2个参数,线程唯一标识if (redis.call('hexists', key, threadid) == 0) then -- 判断当前锁是否还是被自己持有 return nil; -- 如果已经不是自己,则直接返回end;local count = redis.call('hincrby', key, threadid, -1); -- 是自己的锁,则重入次数-1if (count == 0) then -- 判断是否重入次数是否已经为0 redis.call('del', key); -- 等于0说明可以释放锁,直接删除 return nil; end;
完整代码
import java.util.collections;import java.util.uuid;import org.springframework.core.io.classpathresource;import org.springframework.data.redis.core.stringredistemplate;import org.springframework.data.redis.core.script.defaultredisscript;import org.springframework.scripting.support.resourcescriptsource;/** * redis可重入锁 */public class redislock { private static final stringredistemplate redistemplate = springutil.getbean(stringredistemplate.class); private static final defaultredisscript<long> lock_script; private static final defaultredisscript<object> unlock_script; static { // 加载释放锁的脚本 lock_script = new defaultredisscript<>(); lock_script.setscriptsource(new resourcescriptsource(new classpathresource(lock.lua))); lock_script.setresulttype(long.class); // 加载释放锁的脚本 unlock_script = new defaultredisscript<>(); unlock_script.setscriptsource(new resourcescriptsource(new classpathresource(unlock.lua))); } /** * 获取锁 * @param lockname 锁名称 * @param releasetime 超时时间(单位:秒) * @return key 解锁标识 */ public static string trylock(string lockname,string releasetime) { // 存入的线程信息的前缀,防止与其它jvm中线程信息冲突 string key = uuid.randomuuid().tostring(); // 执行脚本 long result = redistemplate.execute( lock_script, collections.singletonlist(lockname), key + thread.currentthread().getid(), releasetime); // 判断结果 if(result != null && result.intvalue() == 1) { return key; }else { return null; } } /** * 释放锁 * @param lockname 锁名称 * @param key 解锁标识 */ public static void unlock(string lockname,string key) { // 执行脚本 redistemplate.execute( unlock_script, collections.singletonlist(lockname), key + thread.currentthread().getid(), null); }}
至此,一个比较完善的redis锁就开发完成了。
以上就是关于3种redis分布式锁的对比的详细内容。