Redisson 如何實現分布式鎖

2021-12-29 Java後端進階

針對項目中使用的分布式鎖進行簡單的示例配置以及源碼解析,並列舉源碼中使用到的一些基礎知識點,但是沒有對redisson中使用到的netty知識進行解析。

本篇主要是對以下幾個方面進行了探索。

Maven配置

RedissonLock簡單示例

源碼中使用到的Redis命令

源碼中使用到的lua腳本語義

源碼分析

Maven配置
<dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson</artifactId>    <version>2.2.12</version></dependency><dependency>    <groupId>com.fasterxml.jackson.core</groupId>    <artifactId>jackson-annotations</artifactId>    <version>2.6.0</version></dependency>

RedissonLock簡單示例

redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。

redis伺服器不在本地的同學請注意權限問題。

Sentinel配置
Config config = new Config();config.useSentinelServers().addSentinelAddress("127.0.0.1:6479", "127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);RedissonClient redisson = Redisson.create(config);

簡單使用

RLock lock = redisson.getLock("test_lock");try{    boolean isLock=lock.tryLock();    if(isLock){        doBusiness();    }}catch(exception e){}finally{    lock.unlock();}

源碼中使用到的Redis命令

分布式鎖主要需要以下redis命令,這裡列舉一下。在源碼分析部分可以繼續參照命令的操作含義。

1.EXISTS key :當 key 存在,返回1;若給定的 key 不存在,返回0。

2.GETSET key value:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。

3.GET key:返回 key 所關聯的字符串值,如果 key 不存在那麼返回 nil。

4.DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。

5.HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值並返回1,如果field已有,那麼就更新value的值,並返回0.

6.HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。

7.HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定欄位field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創建。如果欄位field不存在,在進行當前操作前,其將被創建,且對應的值被置為0,返回值是增量之後的值

8.PEXPIRE key milliseconds:設置存活時間,單位是毫秒。expire操作單位是秒。

9.PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。

源碼中使用到的lua腳本語義

Redisson源碼中,執行redis命令的是lua腳本,其中主要用到如下幾個概念。

redis.call() 是執行redis命令.

KEYS[1] 是指腳本中第1個參數

ARGV[1] 是指腳本中第一個參數的值

返回值中nil與false同一個意思。

需要注意的是,在redis執行lua腳本時,相當於一個redis級別的鎖,不能執行其他操作,類似於原子操作,也是redisson實現的一個關鍵點。

另外,如果lua腳本執行過程中出現了異常或者redis伺服器直接宕掉了,執行redis的根據日誌回復的命令,會將腳本中已經執行的命令在日誌中刪除。

源碼分析RLOCK結構
public interface RLock extends Lock, RExpirable {    void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;    void lock(long leaseTime, TimeUnit unit);    void forceUnlock();    boolean isLocked();    boolean isHeldByCurrentThread();    int getHoldCount();    Future<Void> unlockAsync();    Future<Boolean> tryLockAsync();    Future<Void> lockAsync();    Future<Void> lockAsync(long leaseTime, TimeUnit unit);    Future<Boolean> tryLockAsync(long waitTime, TimeUnit unit);    Future<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);}

該接口主要繼承了Lock接口, 並擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s

RedissonLock獲取鎖 tryLock源碼
Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {       internalLockLeaseTime = unit.toMillis(leaseTime);       return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,                 "if (redis.call('exists', KEYS[1]) == 0) then " +                     "redis.call('hset', KEYS[1], ARGV[2], 1); " +                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                     "return nil; " +                 "end; " +                 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                     "return nil; " +                 "end; " +                 "return redis.call('pttl', KEYS[1]);",                   Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));   }

其中:

KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock

ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s

ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當前訪問線程,用於區分不同伺服器上的線程。

逐句分析:

if (redis.call('exists', KEYS[1]) == 0) then          redis.call('hset', KEYS[1], ARGV[2], 1);          redis.call('pexpire', KEYS[1], ARGV[1]);          return nil;         end;if (redis.call('exists', KEYS[1]) == 0) then          redis.call('hset', KEYS[1], ARGV[2], 1);          redis.call('pexpire', KEYS[1], ARGV[1]);          return nil;         end;

if (redis.call(『exists』, KEYS[1]) == 0) 如果鎖名稱不存在

then redis.call(『hset』, KEYS[1], ARGV[2],1) 則向redis中添加一個key為test_lock的set,並且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數為1

redis.call(『pexpire』, KEYS[1], ARGV[1]) 設置set的過期時間,防止當前伺服器出問題後導致死鎖,return nil; end;返回nil 結束。

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then          redis.call('hincrby', KEYS[1], ARGV[2], 1);          redis.call('pexpire', KEYS[1], ARGV[1]);         return nil;          end;

if (redis.call(『hexists』, KEYS[1], ARGV[2]) == 1) 如果鎖是存在的,檢測是否是當前線程持有鎖,如果是當前線程持有鎖

then redis.call(『hincrby』, KEYS[1], ARGV[2], 1)則將該線程重入的次數++

redis.call(『pexpire』, KEYS[1], ARGV[1]) 並且重新設置該鎖的有效時間

return nil; end;返回nil,結束

return redis.call('pttl', KEYS[1]);
鎖存在, 但不是當前線程加的鎖,則返回鎖的過期時間。

RedissonLock解鎖 unlock源碼
@Override    public void unlock() {        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                        "if (redis.call('exists', KEYS[1]) == 0) then " +                            "redis.call('publish', KEYS[2], ARGV[1]); " +                            "return 1; " +                        "end;" +                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                            "return nil;" +                        "end; " +                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +                        "if (counter > 0) then " +                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +                            "return 0; " +                        "else " +                            "redis.call('del', KEYS[1]); " +                            "redis.call('publish', KEYS[2], ARGV[1]); " +                            "return 1; "+                        "end; " +                        "return nil;",                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));        if (opStatus == null) {            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "                    + id + " thread-id: " + Thread.currentThread().getId());        }        if (opStatus) {            cancelExpirationRenewal();        }    }

其中:

KEYS[1] 表示的是getName() 代表鎖名test_lock

KEYS[2] 表示getChanelName() 表示的是發布訂閱過程中使用的Chanel

ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數字 0,代表解鎖消息

ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s

ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當前鎖id+線程id

語義分析:

if (redis.call('exists', KEYS[1]) == 0) then         redis.call('publish', KEYS[2], ARGV[1]);         return 1;         end;

if (redis.call(『exists』, KEYS[1]) == 0) 如果鎖已經不存在(可能是因為過期導致不存在,也可能是因為已經解鎖)

then redis.call(『publish』, KEYS[2], ARGV[1]) 則發布鎖解除的消息

return 1; end 返回1結束

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then          return nil;         end;

if (redis.call(『hexists』, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當前線程不是加鎖的線

then return nil;end則直接返回nil 結束

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);if (counter > 0) then         redis.call('pexpire', KEYS[1], ARGV[2]);          return 0;else         redis.call('del', KEYS[1]);         redis.call('publish', KEYS[2], ARGV[1]);         return 1;end;

if (redis.call(『hexists』, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當前線程不是加鎖的線

then return nil;end則直接返回nil 結束

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);if (counter > 0) then         redis.call('pexpire', KEYS[1], ARGV[2]);          return 0;else         redis.call('del', KEYS[1]);         redis.call('publish', KEYS[2], ARGV[1]);         return 1;end;

local counter = redis.call(『hincrby』, KEYS[1], ARGV[3], -1) 如果是鎖是當前線程所添加,定義變量counter,表示當前線程的重入次數-1,即直接將重入次數-1

if (counter > 0)如果重入次數大於0,表示該線程還有其他任務需要執行

then redis.call(『pexpire』, KEYS[1], ARGV[2]) 則重新設置該鎖的有效時間

return 0 返回0結束

else redis.call(『del』, KEYS[1])否則表示該線程執行結束,刪除該鎖

redis.call(『publish』, KEYS[2], ARGV[1])並且發布該鎖解除的消息

return 1; end;返回1結束

return nil;
其他情況返回nil並結束

if (opStatus == null) {            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "                    + id + " thread-id: " + Thread.currentThread().getId());        }

腳本執行結束之後,如果返回值不是0或1,即當前線程去解鎖其他線程的加鎖時,拋出異常。

RedissonLock強制解鎖源碼
@Override    public void forceUnlock() {        get(forceUnlockAsync());    }    Future<Boolean> forceUnlockAsync() {        cancelExpirationRenewal();        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,                "if (redis.call('del', KEYS[1]) == 1) then "                + "redis.call('publish', KEYS[2], ARGV[1]); "                + "return 1 "                + "else "                + "return 0 "                + "end",                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage);    }

以上是強制解鎖的源碼,在源碼中並沒有找到forceUnlock()被調用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則並發布鎖被刪除的消息,返回1結束,否則返回0結束。

總結

這裡只是簡單的一個redisson分布式鎖的測試用例,並分析了執行lua腳本這部分,如果要繼續分析執行結束之後的操作,需要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。

相關焦點

  • redisson分布式讀寫鎖-讀鎖watchDog原理
    redisson分布式鎖文章回顧redisson分布式可重入鎖加鎖原理
  • 基於Redisson分布式鎖解決秒殺系統的「超賣」問題
    Redis基礎上的一款綜合中間件,除了擁有Redis本身提供的強大功能外,還提供了諸如分布式鎖、分布式服務、延遲隊列、遠程調用等強大的功能。Redisson開源地址為https://github.com/redisson/redisson/wiki/目錄
  • Redisson 分布式鎖實戰與watch dog機制解讀
    :互斥:在分布式高並發的條件下,需要保證,同一時刻只能有一個線程獲得鎖,這是最最基本的一點。防止死鎖:在分布式高並發的條件下,比如有個線程獲得鎖的同時,還沒有來得及去釋放鎖,就因為系統故障或者其它原因使它無法執行釋放鎖的命令,導致其它線程都無法獲得鎖,造成死鎖。可重入:我們知道ReentrantLock是可重入鎖,那它的特點就是同一個線程可以重複拿到同一個資源的鎖。
  • Redisson實現Redis分布式鎖的N種姿勢
    前幾天發的一篇文章《Redlock:Redis分布式鎖最牛逼的實現》,引起了一些同學的討論,也有一些同學提出了一些疑問,這是好事兒。本文在講解如何使用Redisson實現Redis普通分布式鎖,以及Redlock算法分布式鎖的幾種方式的同時,也附帶解答這些同學的一些疑問。
  • Redisson分布式鎖學習總結:讀鎖 RedissonReadLock#unLock 釋放鎖源碼分析
    收錄於話題 #分布式鎖then " + "return 0;" + "end; " + "end; " + "redis.call('del', KEYS[1]); " + "redis.call('publish
  • 實戰:Redis集群環境下的-RedLock(真分布式鎖)
    每天為您推送優質技術文章在不同進程需要互斥地訪問共享資源時,分布式鎖是一種非常有用的技術手段。 有很多三方庫和文章描述如何用Redis實現一個分布式鎖管理器,但是這些庫實現的方式差別很大,而且很多簡單的實現其實只需採用稍微增加一點複雜的設計就可以獲得更好的可靠性。
  • Redisson分布式鎖學習總結:寫鎖 RedissonWriteLock#unLock 釋放鎖源碼分析
    釋放鎖的源碼,我們這裡也是不再做分析了,因為 RedissonWriteLock 也是基於 RedissonLock 做的擴展,所以釋放鎖的源碼也是和 RedissonLock 保持一致的,我們這裡只需要分析 lua 腳本是如何執行即可。
  • 分布式鎖中的王者方案 - Redisson
    上篇講解了如何用 Redis 實現分布式鎖的五種方案,但我們還是有更優的王者方案,就是用 Redisson。分布式鎖:Redisson還實現了Redis文檔中提到像分布式鎖Lock這樣的更高階應用場景。
  • 分布式鎖解決方案-Redis
    ## 為什麼要學習分布式鎖解決方案為了解決分布式架構帶來的數據準確性問題!我們用synchronized或者 ReentrantLock(瑞恩吹特) 能解決問題嗎?真實生產環境我們採用集群的方式去訪問秒殺商品(nginx為我們做了負載均衡)。
  • 分布式鎖(Redisson)-從零開始,深入理解與不斷優化
    答案是不能,如上圖所示,請求經Nginx分發後,可能存在多個服務同時從Redis中獲取庫存數據,此時只加synchronized (單機鎖)是無效的,並發越高,出現問題的機率就越大。案例3-使用SETNX實現分布式鎖setnx:將 key 的值設為 value,若且唯若 key 不存在。
  • Redisson 分布式鎖詳解與可視化監控方案
    點擊上方「陶陶技術筆記」關注我回復「資料」獲取作者整理的大量學習資料!
  • 如何優雅地用Redis實現分布式鎖?
    什麼是分布式鎖在學習Java多線程編程的時候,鎖是一個很重要也很基礎的概念,鎖可以看成是多線程情況下訪問共享資源的一種線程同步機制。這是對於單進程應用而言的,即所有線程都在同一個JVM進程裡的時候,使用Java語言提供的鎖機制可以起到對共享資源進行同步的作用。
  • Redis如何實現分布式鎖?
    如果是單機應用,直接使用本地鎖就可以避免。如果是分布式應用,本地鎖派不上用場,這時就需要引入分布式鎖來解決。由此可見分布式鎖的目的其實很簡單,就是為了保證多臺伺服器在執行某一段代碼時保證只有一臺伺服器執行。
  • 利用Redis 實現分布式鎖
    對於這個問題,我們可以簡單將鎖分為兩種——內存級鎖以及分布式鎖,內存級鎖即我們在 Java 中的 synchronized 關鍵字(或許加上進程級鎖修飾更恰當些),而分布式鎖則是應用在分布式系統中的一種鎖機制。
  • Redlock:Redis分布式鎖最牛逼的實現
    普通實現說道Redis分布式鎖大部分人都會想到:setnx+lua,或者知道set key value px milliseconds nx。正因為如此,Redis作者antirez基於分布式環境下提出了一種更高級的分布式鎖的實現方式:Redlock。筆者認為,Redlock也是Redis所有分布式鎖實現方式中唯一能讓面試官高潮的方式。
  • 一文掌握 Redisson 分布式鎖原理(值得收藏)
    ReentrantLock 保證了 JVM 共享資源同一時刻只允許單個線程進行操作實現思路ReentrantLock 內部公平鎖與非公平鎖繼承了 AQS[AbstractQueuedSynchronizer]
  • Redisson分布式鎖學習總結:寫鎖 RedissonWriteLock#lock 獲取鎖源碼分析
    收錄於話題 #分布式鎖場景:lua腳本:"if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2]
  • Java都為我們提供了各種鎖,為什麼還需要分布式鎖?
    這篇文章主要是針對為什麼需要使用分布式鎖這個話題來展開討論的。前一段時間在群裡有個兄弟問,既然分布式鎖能解決大部分生產問題,那麼java為我們提供的那些鎖有什麼用呢?直接使用分布式鎖不就結了嘛。針對這個問題我想了很多,一開始是在網上找找看看有沒有類似的回答。後來想了想。想要解決這個問題,還需要從本質上來分析。OK,開始上車出發。
  • 手把手教你實現基於Redis的分布式鎖
    在介紹基於Redis實現的分布式鎖之前;以Python語言為例,我們看看根據應用的實現架構,同步鎖可能會有以下幾種類型:如果處理程序是單進程多線程的,在Python語言中,就可以使用 threading 模塊的 Lock對象來限制對共享資源的同步訪問,實現多線程安全。
  • Spring Integration實現分布式鎖
    點擊上方藍色字體,選擇「標星公眾號」優質文章,第一時間送達  作者 |  僅此而已-遠方來源 |  urlify.cn/b2umMv66套java從入門到精通實戰課程分享 學習本篇之前,可以先看下文章 什麼是分布式鎖