針對項目中使用的分布式鎖進行簡單的示例配置以及源碼解析,並列舉源碼中使用到的一些基礎知識點,但是沒有對redisson中使用到的netty知識進行解析。
本篇主要是對以下幾個方面進行了探索。
Maven配置
RedissonLock簡單示例
源碼中使用到的Redis命令
源碼中使用到的lua腳本語義
源碼分析
Maven配置<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>2.2.12</version></dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.0</version></dependency>RedissonLock簡單示例redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。
redis伺服器不在本地的同學請注意權限問題。
Sentinel配置Config config = new Config();config.useSentinelServers().addSentinelAddress("127.0.0.1:6479", "127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);RedissonClient redisson = Redisson.create(config);簡單使用
RLock lock = redisson.getLock("test_lock");try{ boolean isLock=lock.tryLock(); if(isLock){ doBusiness(); }}catch(exception e){}finally{ lock.unlock();}源碼中使用到的Redis命令分布式鎖主要需要以下redis命令,這裡列舉一下。在源碼分析部分可以繼續參照命令的操作含義。
1.EXISTS key :當 key 存在,返回1;若給定的 key 不存在,返回0。
2.GETSET key value:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。
3.GET key:返回 key 所關聯的字符串值,如果 key 不存在那麼返回 nil。
4.DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。
5.HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值並返回1,如果field已有,那麼就更新value的值,並返回0.
6.HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。
7.HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定欄位field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創建。如果欄位field不存在,在進行當前操作前,其將被創建,且對應的值被置為0,返回值是增量之後的值
8.PEXPIRE key milliseconds:設置存活時間,單位是毫秒。expire操作單位是秒。
9.PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。
源碼中使用到的lua腳本語義Redisson源碼中,執行redis命令的是lua腳本,其中主要用到如下幾個概念。
redis.call() 是執行redis命令.
KEYS[1] 是指腳本中第1個參數
ARGV[1] 是指腳本中第一個參數的值
返回值中nil與false同一個意思。
需要注意的是,在redis執行lua腳本時,相當於一個redis級別的鎖,不能執行其他操作,類似於原子操作,也是redisson實現的一個關鍵點。
另外,如果lua腳本執行過程中出現了異常或者redis伺服器直接宕掉了,執行redis的根據日誌回復的命令,會將腳本中已經執行的命令在日誌中刪除。
源碼分析RLOCK結構public interface RLock extends Lock, RExpirable { void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException; boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; void lock(long leaseTime, TimeUnit unit); void forceUnlock(); boolean isLocked(); boolean isHeldByCurrentThread(); int getHoldCount(); Future<Void> unlockAsync(); Future<Boolean> tryLockAsync(); Future<Void> lockAsync(); Future<Void> lockAsync(long leaseTime, TimeUnit unit); Future<Boolean> tryLockAsync(long waitTime, TimeUnit unit); Future<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);}該接口主要繼承了Lock接口, 並擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s
RedissonLock獲取鎖 tryLock源碼Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }其中:
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock
ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s
ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當前訪問線程,用於區分不同伺服器上的線程。
逐句分析:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;if (redis.call(『exists』, KEYS[1]) == 0) 如果鎖名稱不存在
then redis.call(『hset』, KEYS[1], ARGV[2],1) 則向redis中添加一個key為test_lock的set,並且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數為1
redis.call(『pexpire』, KEYS[1], ARGV[1]) 設置set的過期時間,防止當前伺服器出問題後導致死鎖,return nil; end;返回nil 結束。
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;if (redis.call(『hexists』, KEYS[1], ARGV[2]) == 1) 如果鎖是存在的,檢測是否是當前線程持有鎖,如果是當前線程持有鎖
then redis.call(『hincrby』, KEYS[1], ARGV[2], 1)則將該線程重入的次數++
redis.call(『pexpire』, KEYS[1], ARGV[1]) 並且重新設置該鎖的有效時間
return nil; end;返回nil,結束
return redis.call('pttl', KEYS[1]);
RedissonLock解鎖 unlock源碼
鎖存在, 但不是當前線程加的鎖,則返回鎖的過期時間。@Override public void unlock() { Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } }其中:
KEYS[1] 表示的是getName() 代表鎖名test_lock
KEYS[2] 表示getChanelName() 表示的是發布訂閱過程中使用的Chanel
ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數字 0,代表解鎖消息
ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s
ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當前鎖id+線程id
語義分析:
if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call(『exists』, KEYS[1]) == 0) 如果鎖已經不存在(可能是因為過期導致不存在,也可能是因為已經解鎖)
then redis.call(『publish』, KEYS[2], ARGV[1]) 則發布鎖解除的消息
return 1; end 返回1結束
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end;if (redis.call(『hexists』, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當前線程不是加鎖的線
then return nil;end則直接返回nil 結束
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;end;if (redis.call(『hexists』, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當前線程不是加鎖的線
then return nil;end則直接返回nil 結束
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;end;local counter = redis.call(『hincrby』, KEYS[1], ARGV[3], -1) 如果是鎖是當前線程所添加,定義變量counter,表示當前線程的重入次數-1,即直接將重入次數-1
if (counter > 0)如果重入次數大於0,表示該線程還有其他任務需要執行
then redis.call(『pexpire』, KEYS[1], ARGV[2]) 則重新設置該鎖的有效時間
return 0 返回0結束
else redis.call(『del』, KEYS[1])否則表示該線程執行結束,刪除該鎖
redis.call(『publish』, KEYS[2], ARGV[1])並且發布該鎖解除的消息
return 1; end;返回1結束
return nil;
其他情況返回nil並結束if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); }腳本執行結束之後,如果返回值不是0或1,即當前線程去解鎖其他線程的加鎖時,拋出異常。
RedissonLock強制解鎖源碼@Override public void forceUnlock() { get(forceUnlockAsync()); } Future<Boolean> forceUnlockAsync() { cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage); }以上是強制解鎖的源碼,在源碼中並沒有找到forceUnlock()被調用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則並發布鎖被刪除的消息,返回1結束,否則返回0結束。
總結這裡只是簡單的一個redisson分布式鎖的測試用例,並分析了執行lua腳本這部分,如果要繼續分析執行結束之後的操作,需要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。