上一篇已經簡單介紹了,redisson 提供的讀寫鎖 RReadWriteLock 的使用demo、使用場景、和RedissonLock 的關係;也深入分析了讀鎖 RedissonReadLock 加鎖 lua 腳本的執行邏輯、watchdog 機制 lua 腳本的執行邏輯。
下面,我們將繼續分析讀鎖 RedissonReadLock 釋放鎖時,lua 腳本是怎麼執行的。
1、RedissonReadLock 釋放鎖 lua 腳本分析分析前,我們定一下加鎖的key:
RReadWriteLock readWriteLock = client.getReadWriteLock("myLock");RedissonReadLock#unlockInnerAsync:
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}我們可以看到,這讀鎖釋放鎖的lua腳本還是比較長的,但是我們也不用著急,一步一步分析就可以了。
1.1、KEYSArrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix):
getName(): 鎖key
getChannelName():prefixName("redisson_rwlock", getName()) -> redisson_rwlock:{myLock}
timeoutPrefix:getReadWriteTimeoutNamePrefix(threadId) -> suffixName(getName(), getLockName(threadId)) + ":rwlock_timeout" -> {myLock}:UUID-1:threadId-1:rwlock_timeout
keyPrefix:getKeyPrefix(threadId, timeoutPrefix) -> timeoutPrefix.split(":" + getLockName(threadId))[0] -> {myLock}
KEYS:["myLock","redisson_rwlock:{myLock}","{myLock}:UUID-1:threadId-1:rwlock_timeout","{myLock}"]
1.2、ARGVSLockPubSub.UNLOCK_MESSAGE, getLockName(threadId)
ARGVS:[0L,"UUID:threadId"]
1.3、lua 腳本分析1、分支一:鎖模式不存在,往鎖對應的channel發送消息場景:
如果鎖模式不存在,那麼證明沒有線程持有讀寫鎖
當前線程即使沒有持有鎖,但還是調用了釋放鎖的方法
lua腳本:
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; "分析:
利用 hget 命令獲取讀寫鎖的模式
hget myLock mode如果鎖模式為空,往讀寫鎖對應的channel發送釋放鎖的消息,然後返回1,lua腳本執行完畢
publish redisson_rwlock:{myLock} 0channel 發布鎖釋放消息的用處:
其實在 Redisson 提供的各種分布式鎖中,不管是可重入鎖、公平鎖,還是到現在的讀寫鎖,都會利用 redis 的pub/sub機制來做下面的通知機制。
在線程獲取鎖失敗的時候,在等待前會先訂閱鎖對應的 channel,然後進入等待狀態。
如果當前線程成功釋放鎖,那麼會在鎖對應的 channel 發布釋放鎖的消息;假設此時有其他線程在等待獲取鎖,那麼就會接收到 channel 裡釋放鎖的消息,提前跳出等待狀態,去獲取鎖。
關於鎖channel,要注意的是:可重入鎖和讀寫鎖,等待線程都是訂閱同一個channel;而公平鎖不是,公平鎖是每個等待線程都訂閱自己指定的channel,從而做到公平。
2、分支二:鎖存在,但當前線程沒有持有鎖場景:
lua腳本:
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; "分析:
利用 hexists 命令判斷當前線程是否持有鎖
hexists myLock UUID-1:threadId-1如果不存在直接返回null,表示釋放鎖失敗
3、分支三:鎖存在且當前線程持有鎖場景:
lua腳本:
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"if (counter == 0) then " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); "分析: 分析前,我們假設線程 UUID-1:threadId-1 持有兩個讀鎖,那麼 redis 中鎖相關數據如下:
myLock:{
"mode":"read",
"UUID-1:threadId-1":2
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1
{myLock}:UUID-1:threadId-1:rwlock_timeout:2 1利用 hincrby 命令,給當前線程持有鎖數量減1
hincrby myLock UUID-1:threadId-1 -1如果持有鎖數量減1後等於0,證明當前線程不再持有鎖,那麼利用 hdel 命令將鎖map中加鎖次數記錄刪掉
hdel myLock UUID:threadId刪除線程持有鎖對應的加鎖超時記錄
del {myLock}:UUID-1:threadId-1:rwlock_timeout:count+1分析後,由於我們假設了當前線程持有鎖兩次,所以只會執行步驟1和步驟3,執行後redis 中鎖相關數據如下:
myLock:{
"mode":"read",
"UUID-1:threadId-1":1
}
{myLock}:UUID-1:threadId-1:rwlock_timeout:1 1到這裡,我們可能第一時間會有點疑惑:為什麼給讀鎖扣減不需要先判斷鎖的模式?
其實在前一篇文章中,我們就提及到兩點:
在鎖map中記錄加鎖次數時,讀鎖的key是UUID:threadId,即客戶端ID:線程ID;而寫鎖的key是UUID:threadId:write,即客戶端ID:線程ID:write,那麼就是說讀鎖的key和寫鎖的key是不一樣的。所以解鎖的時候,直接使用對應key來扣減持有鎖次數即可。
還有一點很重要的是,相同線程,如果獲取了寫鎖後,還是可以繼續獲取讀鎖的。所以只需要判斷鎖map有讀鎖加鎖次數記錄即可,就可以判斷當前線程是持有讀鎖的,並不需要關心當前鎖的模式。
4、分支四:給當前鎖刷新過期時間場景:
lua腳本:
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; "分析:
利用 hlen 獲取鎖map中key的數量
hlen myLock如果鎖map中 key 的數量還是大於1,那麼證明還有線程持有鎖,遍歷鎖map集合中的加鎖次數key,根據加鎖超時記錄獲取最大的超時時間
hkeys myLock拼接 key 對應的加鎖記錄對應的超時時間,利用 pttl 獲取超時時間
與 maxRemainTime 對比,獲取當前最大的超時時間,賦值給 maxRemainTime
利用 hget 命令獲取key對應的加鎖次數
遍歷步驟2獲取到的keys
hget myLock key遍歷加鎖次數
pttl {myLock}:UUID-1:threadId-1:rwlock_timeout:num -> remainTimemaxRemainTime = math.max(remainTime, maxRemainTime)設置 maxRemainTime 為 -3
利用 hkeys 命令獲取鎖map中所有key
判斷 maxRemainTime 是否大於0,如果大於0,給鎖重新設置過期時間為 maxRemainTime,然後返回0結束lua腳本的執行
pexpire myLock maxRemainTime這裡我們會思考一個問題,為什麼鎖map中的key都大於1了,證明肯定還有線程持有鎖,那為什麼還會存在 maxRemainTime 最後小於0的情況呢?
有一個點我們還沒學到,那就是其實讀寫鎖中,如果是獲取寫鎖,並不會新增一條寫鎖的超時記錄,因為讀寫鎖中,寫鎖和寫鎖是互斥的,寫鎖和讀鎖也是互斥的,即使支持當前線程先獲取寫鎖再獲取讀鎖,其實也不需要增加一條寫鎖的超時時間,因為讀寫鎖 key 的超時時間就等於寫鎖的超時時間。
如果當前讀寫鎖的鎖模式是寫鎖,直接返回0結束lua腳本的執行
5、最後操作場景:
lua腳本:
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "分析:
利用 del 命令刪除讀寫鎖對應的 key
del myLock往讀寫鎖對應的channel發布鎖釋放消息
publish redisson_rwlock:{myLock} 0
二、最後到此,關於讀寫鎖 RReadWriteLock 中讀鎖 RedissonReadLock 釋放鎖的原理已經分析完了,至於釋放鎖後續的停止 watchdog 的執行等操作,還是和 RedissLock 保持一致,我們就不再分析了。
同時我們可以發現,其實分析讀鎖時很多地方還是需要同時知道寫鎖的部分原理,我們這裡只是提前透露了一些,用於支撐整個讀鎖釋放鎖到原理分析。不過接下來,也將會對寫鎖 RedissonWriteLock 加鎖和釋放鎖的原理進行分析,在梳理過程中,也會帶上一定的場景來分析,儘量做到完全理解為啥 Redisson 要這麼做~