Redis 是一個開源(BSD許可)的,內存中的數據結構存儲系統,它可以用作資料庫、緩存和消息中間件。 它支持多種類型的數據結構,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 等。
redis分布式鎖三板斧,獲取鎖、刪除鎖、鎖超時
Redis是最常見的實現分布式鎖的方法之一,而很多人都了解使用了redis分布式鎖使用redis的
SET key value [EX seconds] [PX milliseconds] [NX|XX]
指令。
對於刪除操作,由於先判斷key是否存在,然後執行del操作,為避免刪除其他線程生成的鎖,因此需要執行使用lua腳本執行,保證原子性,腳本如下
-- lua刪除鎖:-- KEYS和ARGV分別是以集合方式傳入的參數 key為鎖名稱,argv為每個鎖的唯一表示if redis.call('get', KEYS[1]) == ARGV[1] then -- 執行刪除操作 return redis.call('del', KEYS[1]) else -- 不成功,返回0 return 0 end
上面實現的過程中,我們對於設置key的超時時間的設置怎麼處理?一旦我們設置太短,業務代碼耗時過長,則會被超時釋放;如果設置的太長,萬一線程獲得鎖後線程異常或死亡,未能正常釋放鎖,容易導致業務積壓。
有人說我們可以預先計算業務耗時,設置一個最長的,其實這個看似很合理,但實際上,作為一款分布式鎖的基礎服務,本應與業務脫離,需要保證未來業務場景,同時簡化使用方式。
其實,redisson就針對這樣的問題提供了解決方案,那就是watch dog(看門狗),下面我們看下redisson的源碼。
watch dog即通過開啟一個線程進行key的續期操作。
先看redisson的demo使用,如下
// 初始化Config config = new Config();// 由於本地使用單機模式,其他模式也config.useSingleServer().setAddress("127.0.0.1:6379");RedissonClient redisson = Redisson.create(config);// 獲取鎖RLock lock = redisson.getLock("lockkey");lock.lock(30, TimeUnit.SECONDS);
那麼我們直接從redisson.getLock去往下跟蹤代碼,為了簡化,我們直接看到最後的watch dog的代碼(一步一步跟源碼,肯定會看到這段代碼)
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1L) { // 獲取鎖 return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // watchdog RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } }
此時我們看核心代碼tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { this.internalLockLeaseTime = unit.toMillis(leaseTime); return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, //如果鎖不存在,創建鎖,並返回空 "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; //如果鎖存在,且value匹配,說明是當前線程持有的鎖,對key增量加1(重入次數+1),返回空 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; //申請鎖失敗,返回鎖的剩餘時間 return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
上面可以看到如果有鎖會返回nil,然後會看到最上面的代碼,會判斷,如果為null,會進行scheduleExpirationRenewal方法調用,如下
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { if (ttlRemaining == null) { this.scheduleExpirationRenewal(threadId); } } });
scheduleExpirationRenewal方法內容如下:
protected RFuture<Boolean> renewExpirationAsync(long threadId) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果有鎖則續期 30s(源碼中初始化 this.lockWatchdogTimeout = 30000L) "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)}); }
此時我們終於看到了續期操作,了解了watch dog的原理。