您好,欢迎访问一九零五行业门户网

Redis分布式锁实现的方法是什么

一、分布式锁是什么分布式锁是 满足分布式系统或集群模式下多进程可见并且互斥的锁。
基于redis实现分布式锁:
1、获取锁互斥:确保只能有一个线程获取锁;
非阻塞:尝试获取锁,成功返回true,失败返回false;
添加锁过期时间,避免服务宕机引起死锁。
set lock thread1 nx ex 10
2、释放锁手动释放;del key1
超时释放,获取锁时添加一个超时锁;
二、代码实例package com.guor.utils;import org.springframework.data.redis.core.stringredistemplate;import java.util.concurrent.timeunit;public class redislock implements ilock{ private string name; private stringredistemplate stringredistemplate; public redislock(string name, stringredistemplate stringredistemplate) { this.name = name; this.stringredistemplate = stringredistemplate; } private static final string key_prefix = "lock:"; @override public boolean trylock(long timeout) { // 获取线程唯一标识 long threadid = thread.currentthread().getid(); // 获取锁 boolean success = stringredistemplate.opsforvalue() .setifabsent(key_prefix + name, threadid+"", timeout, timeunit.seconds); // 防止拆箱的空指针异常 return boolean.true.equals(success); } @override public void unlock() { stringredistemplate.delete(key_prefix + name); }}
上面代码存在锁误删问题:如果线程1获取锁,但线程1发生了阻塞,导致redis超时释放锁;
此时,线程2尝试获取锁,成功,并执行业务;
此时,线程1重新开始执行任务,并执行完毕,执行释放锁(即删除锁);
但是,线程1删除的锁,和线程2的锁是同一把锁,这就是分布式锁误删问题;
在释放锁时,释放线程自己的分布式锁,就可以解决这个问题。
package com.guor.utils;import cn.hutool.core.lang.uuid;import org.springframework.data.redis.core.stringredistemplate;import java.util.concurrent.timeunit;public class redislock implements ilock{ private string name; private stringredistemplate stringredistemplate; public redislock(string name, stringredistemplate stringredistemplate) { this.name = name; this.stringredistemplate = stringredistemplate; } private static final string key_prefix = "lock:"; private static final string uuid_prefix = uuid.randomuuid().tostring(true) + "-"; @override public boolean trylock(long timeout) { // 获取线程唯一标识 string threadid = uuid_prefix + thread.currentthread().getid(); // 获取锁 boolean success = stringredistemplate.opsforvalue() .setifabsent(key_prefix + name, threadid, timeout, timeunit.seconds); // 防止拆箱的空指针异常 return boolean.true.equals(success); } @override public void unlock() { // 获取线程唯一标识 string threadid = uuid_prefix + thread.currentthread().getid(); // 获取锁中的标识 string id = stringredistemplate.opsforvalue().get(key_prefix + name); // 判断标示是否一致 if(threadid.equals(id)) { // 释放锁 stringredistemplate.delete(key_prefix + name); } }}
三、基于setnx实现的分布式锁存在下面几个问题1、不可重入同一个线程无法多次获取同一把锁。
2、不可重试获取锁只尝试一次就返回false,没有重试机制。
3、超时释放锁的超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
4、主从一致性如果redis是集群部署的,主从同步存在延迟,当主机宕机时,此时会选一个从作为主机,但是此时的从没有锁标识,此时,其它线程可能会获取到锁,导致安全问题。
四、redisson实现分布式锁redisson是基于redis实现的操作java内存数据网格。除了提供常用的分布式java对象,它还提供了许多分布式服务,其中包括各种分布式锁的实现。
1、pom<!--redisson--><dependency> <groupid>org.redisson</groupid> <artifactid>redisson</artifactid> <version>3.13.6</version></dependency>
2、配置类package com.guor.config;import org.redisson.redisson;import org.redisson.api.redissonclient;import org.redisson.config.config;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;@configurationpublic class redissonconfig { @bean public redissonclient redissonclient(){ // 配置 config config = new config(); /** * 单点地址usesingleserver,集群地址useclusterservers */ config.usesingleserver().setaddress("redis://127.0.0.1:6379").setpassword("123456"); // 创建redissonclient对象 return redisson.create(config); }}
3、测试类package com.guor;import lombok.extern.slf4j.slf4j;import org.junit.jupiter.api.beforeeach;import org.junit.jupiter.api.test;import org.redisson.api.rlock;import org.redisson.api.redissonclient;import org.springframework.boot.test.context.springboottest;import javax.annotation.resource;import java.util.concurrent.timeunit;@slf4j@springboottestclass redissontest { @resource private redissonclient redissonclient; private rlock lock; @beforeeach void setup() { // 获取指定名称的锁 lock = redissonclient.getlock("nezha"); } @test void test() throws interruptedexception { // 尝试获取锁 boolean islock = lock.trylock(1l, timeunit.seconds); if (!islock) { log.error("获取锁失败"); return; } try { log.info("哪吒最帅,哈哈哈"); } finally { // 释放锁 lock.unlock(); } }}
五、探索trylock源码1、trylock源码尝试获取锁public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception { // 最大等待时间 long time = unit.tomillis(waittime); long current = system.currenttimemillis(); long threadid = thread.currentthread().getid(); long ttl = this.tryacquire(waittime, leasetime, unit, threadid); if (ttl == null) { return true; } else { // 剩余等待时间 = 最大等待时间 - 获取锁失败消耗的时间 time -= system.currenttimemillis() - current; if (time <= 0l) {// 获取锁失败 this.acquirefailed(waittime, unit, threadid); return false; } else { // 再次尝试获取锁 current = system.currenttimemillis(); // subscribe订阅其它释放锁的信号 rfuture<redissonlockentry> subscribefuture = this.subscribe(threadid); // 当future在等待指定时间time内完成时,返回true if (!subscribefuture.await(time, timeunit.milliseconds)) { if (!subscribefuture.cancel(false)) { subscribefuture.oncomplete((res, e) -> { if (e == null) { // 取消订阅 this.unsubscribe(subscribefuture, threadid); } }); } this.acquirefailed(waittime, unit, threadid); return false;// 获取锁失败 } else { try { // 剩余等待时间 = 剩余等待时间 - 获取锁失败消耗的时间 time -= system.currenttimemillis() - current; if (time <= 0l) { this.acquirefailed(waittime, unit, threadid); boolean var20 = false; return var20; } else { boolean var16; do { long currenttime = system.currenttimemillis(); // 重试获取锁 ttl = this.tryacquire(waittime, leasetime, unit, threadid); if (ttl == null) { var16 = true; return var16; } // 再次失败了,再看一下剩余时间 time -= system.currenttimemillis() - currenttime; if (time <= 0l) { this.acquirefailed(waittime, unit, threadid); var16 = false; return var16; } // 再重试获取锁 currenttime = system.currenttimemillis(); if (ttl >= 0l && ttl < time) { // 通过信号量的方式尝试获取信号,如果等待时间内,依然没有结果,会返回false ((redissonlockentry)subscribefuture.getnow()).getlatch().tryacquire(ttl, timeunit.milliseconds); } else { ((redissonlockentry)subscribefuture.getnow()).getlatch().tryacquire(time, timeunit.milliseconds); } time -= system.currenttimemillis() - currenttime; } while(time > 0l); this.acquirefailed(waittime, unit, threadid); var16 = false; return var16; } } finally { this.unsubscribe(subscribefuture, threadid); } } } }}
2、重置锁的有效期private void scheduleexpirationrenewal(long threadid) { redissonlock.expirationentry entry = new redissonlock.expirationentry(); // this.getentryname():锁的名字,一个锁对应一个entry // putifabsent:如果不存在,将锁和entry放到map里 redissonlock.expirationentry oldentry = (redissonlock.expirationentry)expiration_renewal_map.putifabsent(this.getentryname(), entry); if (oldentry != null) { // 同一个线程多次获取锁,相当于重入 oldentry.addthreadid(threadid); } else { // 如果是第一次 entry.addthreadid(threadid); // 更新有效期 this.renewexpiration(); }}
更新有效期,递归调用更新有效期,永不过期
private void renewexpiration() { // 从map中得到当前锁的entry redissonlock.expirationentry ee = (redissonlock.expirationentry)expiration_renewal_map.get(this.getentryname()); if (ee != null) { // 开启延时任务 timeout task = this.commandexecutor.getconnectionmanager().newtimeout(new timertask() { public void run(timeout timeout) throws exception { redissonlock.expirationentry ent = (redissonlock.expirationentry)redissonlock.expiration_renewal_map.get(redissonlock.this.getentryname()); if (ent != null) { // 取出线程id long threadid = ent.getfirstthreadid(); if (threadid != null) { // 刷新有效期 rfuture<boolean> future = redissonlock.this.renewexpirationasync(threadid); future.oncomplete((res, e) -> { if (e != null) { redissonlock.log.error("can't update lock " + redissonlock.this.getname() + " expiration", e); } else { if (res) { // 递归调用更新有效期,永不过期 redissonlock.this.renewexpiration(); } } }); } } } }, this.internallockleasetime / 3l, timeunit.milliseconds);// 10s ee.settimeout(task); }}
更新有效期protected rfuture<boolean> renewexpirationasync(long threadid) { return this.evalwriteasync(this.getname(), longcodec.instance, rediscommands.eval_boolean, // 判断当前线程的锁是否是当前线程 "if (redis.call('hexists', keys[1], argv[2]) == 1) then // 更新有效期 redis.call('pexpire', keys[1], argv[1]); return 1; end; return 0;", collections.singletonlist(this.getname()), this.internallockleasetime, this.getlockname(threadid));}
3、调用lua脚本<t> rfuture<t> trylockinnerasync(long waittime, long leasetime, timeunit unit, long threadid, redisstrictcommand<t> command) { // 锁释放时间 this.internallockleasetime = unit.tomillis(leasetime); return this.evalwriteasync(this.getname(), longcodec.instance, command, // 判断锁成功 "if (redis.call('exists', keys[1]) == 0) then redis.call('hincrby', keys[1], argv[2], 1); // 如果不存在,记录锁标识,次数+1 redis.call('pexpire', keys[1], argv[1]); // 设置锁有效期 return nil; // 相当于java的null end; if (redis.call('hexists', keys[1], argv[2]) == 1) then redis.call('hincrby', keys[1], argv[2], 1); // 如果存在,判断锁标识是否是自己的,次数+1 redis.call('pexpire', keys[1], argv[1]); // 设置锁有效期 return nil; end; // 判断锁失败,pttl:指定锁剩余有效期,单位毫秒,keys[1]:锁的名称 return redis.call('pttl', keys[1]);", collections.singletonlist(this.getname()), this.internallockleasetime, this.getlockname(threadid));}
六、释放锁unlock源码1、取消更新任务public rfuture<void> unlockasync(long threadid) { rpromise<void> result = new redissonpromise(); rfuture<boolean> future = this.unlockinnerasync(threadid); future.oncomplete((opstatus, e) -> { // 取消更新任务 this.cancelexpirationrenewal(threadid); if (e != null) { result.tryfailure(e); } else if (opstatus == null) { illegalmonitorstateexception cause = new illegalmonitorstateexception("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadid); result.tryfailure(cause); } else { result.trysuccess((object)null); } }); return result;}
2、删除定时任务void cancelexpirationrenewal(long threadid) { // 从map中取出当前锁的定时任务entry redissonlock.expirationentry task = (redissonlock.expirationentry)expiration_renewal_map.get(this.getentryname()); if (task != null) { if (threadid != null) { task.removethreadid(threadid); } // 删除定时任务 if (threadid == null || task.hasnothreads()) { timeout timeout = task.gettimeout(); if (timeout != null) { timeout.cancel(); } expiration_renewal_map.remove(this.getentryname()); } }}
以上就是redis分布式锁实现的方法是什么的详细内容。
其它类似信息

推荐信息