在多線程的環(huán)境下,為了保證一個(gè)代碼塊在同一時(shí)間只能由一個(gè)線程訪問,Java中我們一般可以使用synchronized語(yǔ)法和ReetrantLock去保證,這實(shí)際上是本地鎖的方式。但是現(xiàn)在公司都是流行分布式架構(gòu),在分布式環(huán)境下,如何保證不同節(jié)點(diǎn)的線程同步執(zhí)行呢?因此就引出了分布式鎖,它是控制分布式系統(tǒng)之間互斥訪問共享資源的一種方式。
在一個(gè)分布式系統(tǒng)中,多臺(tái)機(jī)器上部署了多個(gè)服務(wù),當(dāng)客戶端一個(gè)用戶發(fā)起一個(gè)數(shù)據(jù)插入請(qǐng)求時(shí),如果沒有分布式鎖機(jī)制保證,那么那多臺(tái)機(jī)器上的多個(gè)服務(wù)可能進(jìn)行并發(fā)插入操作,導(dǎo)致數(shù)據(jù)重復(fù)插入,對(duì)于某些不允許有多余數(shù)據(jù)的業(yè)務(wù)來說,這就會(huì)造成問題。而分布式鎖機(jī)制就是為了解決類似這類問題,保證多個(gè)服務(wù)之間互斥的訪問共享資源,如果一個(gè)服務(wù)搶占了分布式鎖,其他服務(wù)沒獲取到鎖,就不進(jìn)行后續(xù)操作。大致意思如下圖所示:
分布式鎖一般有如下的特點(diǎn):
我們一般實(shí)現(xiàn)分布式鎖有以下幾種方式:
說到Redis分布式鎖,大部分人都會(huì)想到:setnx+lua
(redis保證執(zhí)行l(wèi)ua腳本時(shí)不執(zhí)行其他操作,保證操作的原子性),或者知道set key value px milliseconds nx
。后一種方式的核心實(shí)現(xiàn)命令如下:
- 獲取鎖(unique_value可以是UUID等) SET resource_name unique_value NX PX 30000 - 釋放鎖(lua腳本中,一定要比較value,防止誤解鎖) if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
這種實(shí)現(xiàn)方式有3大要點(diǎn)(也是面試概率非常高的地方):
set key value px milliseconds nx
;事實(shí)上這類鎖最大的缺點(diǎn)就是它加鎖時(shí)只作用在一個(gè)Redis節(jié)點(diǎn)上,即使Redis通過sentinel保證高可用,如果這個(gè)master節(jié)點(diǎn)由于某些原因發(fā)生了主從切換,那么就會(huì)出現(xiàn)鎖丟失的情況:
為了避免單點(diǎn)故障問題,Redis作者antirez基于分布式環(huán)境下提出了一種更高級(jí)的分布式鎖的實(shí)現(xiàn)方式:Redlock。Redlock也是Redis所有分布式鎖實(shí)現(xiàn)方式中唯一能讓面試官高潮的方式。
antirez提出的redlock算法大概是這樣的:
在Redis的分布式環(huán)境中,我們假設(shè)有N個(gè)Redis master。這些節(jié)點(diǎn)完全互相獨(dú)立,不存在主從復(fù)制或者其他集群協(xié)調(diào)機(jī)制。我們確保將在N個(gè)實(shí)例上使用與在Redis單實(shí)例下相同方法獲取和釋放鎖?,F(xiàn)在我們假設(shè)有5個(gè)Redis master節(jié)點(diǎn),同時(shí)我們需要在5臺(tái)服務(wù)器上面運(yùn)行這些Redis實(shí)例,這樣保證他們不會(huì)同時(shí)都宕掉。
為了取到鎖,客戶端應(yīng)該執(zhí)行以下操作:
redisson已經(jīng)有對(duì)redlock算法封裝,接下來對(duì)其用法進(jìn)行簡(jiǎn)單介紹,并對(duì)核心源碼進(jìn)行分析(假設(shè)5個(gè)redis實(shí)例)。
!-- https://mvnrepository.com/artifact/org.redisson/redisson --> dependency> groupId>org.redisson/groupId> artifactId>redisson/artifactId> version>3.3.2/version> /dependency>
首先,我們來看一下redission封裝的redlock算法實(shí)現(xiàn)的分布式鎖用法,非常簡(jiǎn)單,跟重入鎖(ReentrantLock)有點(diǎn)類似:
Config config = new Config(); config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389") .setMasterName("masterName") .setPassword("password").setDatabase(0); RedissonClient redissonClient = Redisson.create(config); // 還可以getFairLock(), getReadWriteLock() RLock redLock = redissonClient.getLock("REDLOCK_KEY"); boolean isLock; try { isLock = redLock.tryLock(); // 500ms拿不到鎖, 就認(rèn)為獲取鎖失敗。10000ms即10s是鎖失效時(shí)間。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); if (isLock) { //TODO if get lock success, do something; } } catch (Exception e) { } finally { // 無論如何, 最后都要解鎖 redLock.unlock(); }
實(shí)現(xiàn)分布式鎖的一個(gè)非常重要的點(diǎn)就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源碼在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID(); String getLockName(long threadId) { return id + ":" + threadId; }
獲取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心源碼都是下面這段代碼,只不過前者獲取鎖的默認(rèn)租約時(shí)間(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
T> RFutureT> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 獲取鎖時(shí)向5個(gè)redis實(shí)例發(fā)送的命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // 首先分布式鎖的KEY不能存在,如果確實(shí)不存在,那么執(zhí)行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通過pexpire設(shè)置失效時(shí)間(也是鎖的租約時(shí)間) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果分布式鎖的KEY已經(jīng)存在,并且value也匹配,表示是當(dāng)前線程持有的鎖,那么重入次數(shù)加1,并且設(shè)置失效時(shí)間 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 獲取分布式鎖的KEY的失效時(shí)間毫秒數(shù) "return redis.call('pttl', KEYS[1]);", // 這三個(gè)參數(shù)分別對(duì)應(yīng)KEYS[1],ARGV[1]和ARGV[2] Collections.Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
獲取鎖的命令中,
釋放鎖的代碼為redLock.unlock(),核心源碼如下:
protected RFutureBoolean> unlockInnerAsync(long threadId) { // 向5個(gè)redis實(shí)例都執(zhí)行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果分布式鎖KEY不存在,那么向channel發(fā)布一條消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // 如果分布式鎖存在,但是value不匹配,表示鎖已經(jīng)被占用,那么直接返回 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + // 如果就是當(dāng)前線程占有分布式鎖,那么將重入次數(shù)減1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 重入次數(shù)減1后的值如果大于0,表示分布式鎖有重入過,那么只設(shè)置失效時(shí)間,還不能刪除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 重入次數(shù)減1后的值如果為0,表示分布式鎖只獲取過1次,那么刪除這個(gè)KEY,并發(fā)布解鎖消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", // 這5個(gè)參數(shù)分別對(duì)應(yīng)KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
下面利用SpringBoot + Jedis + AOP的組合來實(shí)現(xiàn)一個(gè)簡(jiǎn)易的分布式鎖。
自定義一個(gè)注解,被注解的方法會(huì)執(zhí)行獲取分布式鎖的邏輯
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RedisLock { /** * 業(yè)務(wù)鍵 * * @return */ String key(); /** * 鎖的過期秒數(shù),默認(rèn)是5秒 * * @return */ int expire() default 5; /** * 嘗試加鎖,最多等待時(shí)間 * * @return */ long waitTime() default Long.MIN_VALUE; /** * 鎖的超時(shí)時(shí)間單位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS; }
在AOP中我們?nèi)?zhí)行獲取分布式鎖和釋放分布式鎖的邏輯,代碼如下:
@Aspect @Component public class LockMethodAspect { @Autowired private RedisLockHelper redisLockHelper; @Autowired private JedisUtil jedisUtil; private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class); @Around("@annotation(com.redis.lock.annotation.RedisLock)") public Object around(ProceedingJoinPoint joinPoint) { Jedis jedis = jedisUtil.getJedis(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); RedisLock redisLock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); String key = redisLock.key(); try { final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit()); logger.info("isLock : {}",islock); if (!islock) { logger.error("獲取鎖失敗"); throw new RuntimeException("獲取鎖失敗"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系統(tǒng)異常"); } } finally { logger.info("釋放鎖"); redisLockHelper.unlock(jedis,key, value); jedis.close(); } } }
@Component public class RedisLockHelper { private long sleepTime = 100; /** * 直接使用setnx + expire方式獲取分布式鎖 * 非原子性 * * @param key * @param value * @param timeout * @return */ public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) { Long result = jedis.setnx(key, value); // result = 1時(shí),設(shè)置成功,否則設(shè)置失敗 if (result == 1L) { return jedis.expire(key, timeout) == 1L; } else { return false; } } /** * 使用Lua腳本,腳本中使用setnex+expire命令進(jìn)行加鎖操作 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */ public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; ListString> keys = new ArrayList>(); ListString> values = new ArrayList>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判斷是否成功 return result.equals(1L); } /** * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */ public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) { long seconds = timeUnit.toSeconds(timeout); return "OK".equals(jedis.set(key, value, "NX", "EX", seconds)); } /** * 自定義獲取鎖的超時(shí)時(shí)間 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */ public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException { long seconds = timeUnit.toSeconds(timeout); while (waitTime >= 0) { String result = jedis.set(key, value, "nx", "ex", seconds); if ("OK".equals(result)) { return true; } waitTime -= sleepTime; Thread.sleep(sleepTime); } return false; } /** * 錯(cuò)誤的解鎖方法—直接刪除key * * @param key */ public void unlock_with_del(Jedis jedis,String key) { jedis.del(key); } /** * 使用Lua腳本進(jìn)行解鎖操縱,解鎖的時(shí)候驗(yàn)證value值 * * @param jedis * @param key * @param value * @return */ public boolean unlock(Jedis jedis,String key,String value) { String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); } }
定義一個(gè)TestController來測(cè)試我們實(shí)現(xiàn)的分布式鎖
@RestController public class TestController { @RedisLock(key = "redis_lock") @GetMapping("/index") public String index() { return "index"; } }
站在巨人的肩膀上
1.Redlock:Redis分布式鎖最牛逼的實(shí)現(xiàn)
2.基于Redis的分布式鎖實(shí)現(xiàn)
到此這篇關(guān)于Redis分布式鎖升級(jí)版RedLock及SpringBoot實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Redis分布式鎖RedLock內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:楊凌 果洛 吉安 北京 朝陽(yáng) 江蘇 臺(tái)州 大慶
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Redis分布式鎖升級(jí)版RedLock及SpringBoot實(shí)現(xiàn)方法》,本文關(guān)鍵詞 Redis,分布式,鎖,升級(jí)版,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。