經常聽到分布式鎖這個名詞,覺得高大上又離我很遙遠,於是也沒有進行什麼深究。直到面試被反覆問道,我覺得有必要了解一下,以跟上這個時代的節奏
常見的分布式鎖實現,一個是ZooKeeper,另一個就是Redis了。Redis這麼熟悉,當然是拿它先開刀了。在進行鎖的實現的時候,有幾個基本概念,需要先說明一下。
第一個就是鎖的概念,這個鎖不像java或者jvm中那種複雜又難懂的鎖,你可以把鎖理解為一個值,根據這個值的存在與否,來決定是否獲取到鎖
在Redis中,有一種稱為樂觀鎖的東西,什麼叫樂觀鎖,意思就是這種策略是樂觀的,總是樂觀的認為別的客戶端連接不會修改該值,因此除了本客戶端連接可以讀取到某種資源(存儲在Redis上的鍵值對信息),其它客戶端連接也可以讀取到。那萬一其餘客戶端連接修改了資源呢?這個時候Redis Server會給加了樂觀鎖的客戶端連接發送消息,訴你該數據已經修改過了,這個時候本客戶端連接可以選擇循環重試或者直接退出。這個聽起來是不是像CAS的操作呢?我的感覺是像極了。具體涉及的命令就是watch unwatch multi exec,這些命令什麼意思,下文詳細說明
另一種鎖稱之為悲觀鎖。什麼是悲觀鎖呢?就是客戶端連接總是認為數據隨時都可能被其餘客戶端連接修改掉,所以加了悲觀鎖,其餘的客戶端連接在此刻是無法讀取到資源,自然也無法進行操作。當本客戶端連接釋放了鎖,其餘的客戶端連接才可以獲取鎖,進而進行操作。涉及的命令不僅包括watch unwatch multi exec,也包括setnx ttl expire等命令,同樣後文細說。
不像java中已經提供了各種鎖以及同步容器,並發工具,Redis本身它不提供任何鎖的實現,也沒有樂觀鎖、悲觀鎖,超時鎖等東西,這需要我們組合使用Redis的各種原子命令以及不同的命令特性,來自己實現鎖,同時在代碼層面一致的使用和釋放鎖。
再來說一下Redis客戶端。Redis是用c語言寫的,要麼使用自帶的redis-cli進行連接和交互,要麼使用根據通信協議封裝好的不同語言的客戶端庫。Redis的java客戶端庫很多,常用的有Jedis、Lettuce 。並且Spring SpringBoot默認的就是支持這兩種客戶端,所以學習了這兩個客戶端對以後也很有幫助。Lettuce 的功能更加強大,它基於netty,支持Redis的哨兵模式和集群模式。但是由於我知識淺薄,只能拿Jedis來作為演示啦。
由於樂觀鎖會導致無謂的修改循環重試,導致很少能夠修改成功,耗費資源。而悲觀鎖,雖然在獲取鎖時不斷重試,但對於修改資源,卻是一次就成功了,在資源競爭嚴重的時候,悲觀鎖策略性能更好,因此這裡主要選擇悲觀鎖這種思想來進行代碼演示。
首先我們設定一個鍵為鎖(Redis就是Key Value型存儲),並使用setnx命令,該命令的特點是,如果鍵存在,則設置定指定的值,並返回1,如果鍵存在,則什麼都不做,並返回0。對應到代碼中來就是,如果執行setnx命令返回了1,則說明獲取到了鎖,代碼可以繼續往下執行,如果執行setnx命令返回0,則說明沒有獲取到鎖,當前線程等待或者重試。
獲取到鎖,執行代碼完畢,需要釋放鎖。怎麼釋放呢,就是很簡單的執行del命令即可,即把鎖的key給刪除,這樣其餘的連接就可以獲取鎖了。
偽代碼如下:
while(con.setnx(lockKey,lockValue)==0){ //休眠 重試獲取鎖}// 執行業務代碼con.del(lockKey);
一切看起來很美好,不是嗎?但現實總是千變萬化的,一種可能是一段代碼的執行,需要在不同地方獲取不同的鎖,導致死鎖的發生,又或者是網絡故障或者客戶進程崩潰,造成鎖永遠無法釋放。這就會導致其餘的Redis連接一直無法獲取到鎖,因為一臺機器的代碼問題,網絡問題,機器故障等原因,導致所有的服務都變得不可用,這是讓人無法接受的,這違背了分布式的初衷。
怎麼解決這個問題呢?我們可以指定一個鎖的過期時間,比如10s後這個鎖會過期,並且極限條件下業務執行時間也不會超過10s。(在服務有互相依賴,複雜的服務調用中,調用鏈越長,超時時間越不好預估,但這個也是在解決問題和性能之間做一個平衡,超時時間設置太長,性能會大大降低,超時時間太短,會造成並發問題,因為一個連接中代碼還沒有執行完,鎖已經被刪除,同時另一個連接獲取到了鎖,執行業務代碼)。設置了鎖的過期時間,解決了鎖不釋放的問題,但是同時引入的新的問題,那就是可能會刪除其餘連接的鎖。比如A連接獲取到鎖key1,執行很長時間,此時鎖過期,被刪除,另一個連接B獲取到鎖key1,並執行對應的代碼,此時A連接執行結束,於是釋放鎖,但是此時,其實它的鎖已經被釋放了,在鎖過期的時候,現在它釋放的是B連接的鎖,那是不對的。假如此刻C連接進來,是能夠獲取到鎖的,那麼就意味著B C在同時執行業務代碼,違背了鎖當初設計的本意,因此絕對不能釋放其餘連接的鎖,而只能釋放自己的。
那麼如何解決這個問題?其實我們可以在連接獲取鎖的時候,設置一個只有當前連接知道的唯一值,釋放的時候會先取出鎖的值,進行比較,只有跟存入的值是一致的時候,才會釋放鎖,也就是刪除鍵,否則,什麼也不做。
分析了這麼多,我們可以看看獲取鎖的工具類代碼:
/** * 在指定的等待時限內獲取鎖 * @param jedis 連接 * @param lockName 鎖名稱 * @param timeOutMillionSeconds 獲取鎖超時時間 -1:一直等待,直到獲取到鎖 * @return 如果獲取到鎖,返回一個鎖標識符,否則返回null */public static String acquireLock(Jedis jedis,String lockName,long timeOutMillionSeconds){ // 略去各類校驗 String identifier = UUID.randomUUID().toString(); long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds; while(System.currentTimeMillis()<=timeEnd){ long result = jedis.setnx(lockName,identifier); if(result==1){// 等於1 說明沒有設置過,獲取鎖成功 return identifier; } try { TimeUnit.MILLISECONDS.sleep(100);// 線程休眠值 根據業務來定 }catch (InterruptedException e){//假設在本機沒有多線程編程 不會通知另一方線程中斷 根據具體業務來 throw new RuntimeException(e); } } return null;}
注意代碼中的jedis沒有close,根據需要,也可以在工具類中close掉。
以及超時鎖,為了防止setnx和expire之間,程序崩潰,造成超時時間沒有設置上,因此其餘的連接在獲取不到鎖的時候,會先判斷鎖有沒有過期時間,如果沒有,給鎖加上過期時間:
/** *在指定的等待時限內獲取鎖,該鎖自身帶有超時特性 * @param jedis 連接 * @param lockName 鎖名稱 * @param timeOutMillionSeconds 獲取鎖超時時間 -1:一直等待,直到獲取到鎖 * @param lockTimeOutSeconds 鎖的超時時間 * @return 如果獲取到鎖,返回一個鎖標識符,否則返回null */ public static String acquireLockTimeOut(Jedis jedis,String lockName,long timeOutMillionSeconds,int lockTimeOutSeconds){ // 略去各類校驗 String identifier = UUID.randomUUID().toString(); long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds; while(System.currentTimeMillis()<=timeEnd){ long result = jedis.setnx(lockName,identifier); if(result==1){// 等於1 說明沒有設置過,獲取鎖成功 jedis.expire(lockName,lockTimeOutSeconds); return identifier; }else{ if(jedis.ttl(lockName)==-1){ jedis.expire(lockName,lockTimeOutSeconds); } } try { TimeUnit.MILLISECONDS.sleep(100);// 線程休眠值 根據業務來定 }catch (InterruptedException e){//假設在本機沒有多線程編程 不會通知另一方線程中斷 根據具體業務來 throw new RuntimeException(e); } } return null; }
釋放鎖的代碼:
/** * 關於watch multi exec等命令,參考https://redis.io/topics/transactions 才能深刻理解 * @param jedis 連接 * @param lockName 鎖名稱 * @param identifier 鎖標識符 * @return 是否成功釋放鎖 僅作為參考 */ public static boolean releaseLock(Jedis jedis,String lockName,String identifier){ // 略去各類校驗 boolean releaseNormal = false;// 鎖是否正常釋放 jedis.watch(lockName); String identifierInRedis = jedis.get(lockName); if(identifier.equalsIgnoreCase(identifierInRedis)){// 如果標識符沒有改動,則說明可以解鎖 Transaction transaction = jedis.multi(); transaction.del(lockName); transaction.exec(); releaseNormal = true; }else{ jedis.unwatch(); releaseNormal = false; } return releaseNormal; }
這裡重點解釋一下釋放鎖的操作:只有現在取出的跟當時存入的值一致,才會進行刪除操作。但為了防止get 和del之間的某個時候,另一個連接修改了鎖的值,(為什麼會修改?是因為當前連接A在執行完get之後,鎖過期了,因此另一個連接B可以獲取到鎖,現在A執行刪除操作,就是刪除B連接獲取到的鎖),因此需要watch 操作,如果現在取出的值和當初存入的不一致,那麼直接執行unwatch並返回。為什麼要執行unwatch呢?因為為了安全,假如不執行unwatch就返回,在後續的代碼中執行multi和exec,那就有很大的問題,當鎖被刪除或者修改,就會打斷當前的事務,但是該事物跟鎖是沒有任何關係的,所以unwatch是一個需要執行的操作。另一個情況是假如當前連接取出的鎖的值,跟存入的一致,就需要執行刪除鎖的操作。可能有同學就會問了,Redis的所有操作都是原子操作,執行del和包裹在multi exec中執行del不是一樣的原子操作嗎?為何還要多此一舉,讓代碼變得不好理解。在這一點上,它們確實並無任何區別,但是重點是之前有一個watch命令。假如沒有執行watch multi del exec這樣的順序,就會有釋放掉其餘連接的鎖的風險,為什麼會這樣,上文已經做了分析。在get和del之間發生的事情,當前連接是不知道的,get del的執行不是原子性的。有了watch multi del exec這個順序,當前連接A get執行之後,鎖失效,且被另一個連接B獲取到鎖,也就是修改了鎖,因為watch(key),所以當前連接就知道了有人修改了。當執行exec的時候,就會丟棄掉del命令,因為watch的通知使得事務已經失效了,這保證了其餘連接的鎖不會被刪除。同時,當執行exec的時候,不論事務成功與否,都unwatch了。最終呢,釋放鎖的代碼看起來就是這樣了。
寫好了工具類,我們應該測試一下,看看是不是真的,我們可以使用線程池,放入200個任務,每個任務都是執行獲取Redis的某個鍵,並+1,再設置回Redis。在執行代碼前,進行鎖獲取,執行完畢,進行鎖釋放。為了等到所有線程執行完畢,便於獲取最終執行結果,使用CountDownLatch進行等待線程池所有任務的執行完畢。另外的一些部分是初始化動作,防止鎖已經設置了或者指定的鍵已經有值了。代碼如下:
public class LockTest { private static final Log log = LogFactory.getLog(LockTest.class); public static void main(String[] args)throws Exception { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy()); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100"); final CountDownLatch countDownLatch = new CountDownLatch(200); final String lockName = "lock_a"; Jedis jedisInit = jedisPool.getResource(); jedisInit.del(lockName); final String testResource = "test_str"; jedisInit.set(testResource,"0"); jedisInit.close(); for(int i=0;i<200;i++){ threadPoolExecutor.execute(new Runnable() { public void run() { Jedis jedis = jedisPool.getResource(); String identifiler = RedisSetnxLock.acquireLock(jedis,lockName,-1); if(identifiler==null) { log.info("獲取鎖失敗"); countDownLatch.countDown(); jedis.close(); return; } try{ String value = jedis.get(testResource); Thread.sleep(200);// 故意休眠 jedis.set(testResource,(Integer.parseInt(value)+1)+""); }catch (Exception e){ e.printStackTrace(); }finally { RedisSetnxLock.releaseLock(jedis,lockName,identifiler); jedis.close(); countDownLatch.countDown(); } } }); } countDownLatch.await(); log.info(jedisPool.getResource().get(testResource)); threadPoolExecutor.shutdown(); }}
最後的執行結果符合預期。
為了演示連接掛掉或者執行超常任務的情形,可以執行下面的測試:
public class LockTest { private static final Log log = LogFactory.getLog(LockTest.class); public static void main(String[] args)throws Exception { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy()); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100"); final CountDownLatch countDownLatch = new CountDownLatch(200); final String lockName = "lock_a"; Jedis jedisInit = jedisPool.getResource(); jedisInit.del(lockName); final String testResource = "test_str"; jedisInit.set(testResource,"0"); jedisInit.close(); for(int i=0;i<200;i++){ threadPoolExecutor.execute(new Runnable() { public void run() { Jedis jedis = jedisPool.getResource(); // 鎖的超時時間為1s String identifiler = RedisSetnxLock.acquireLockTimeOur(jedis,lockName,-1,1); if(identifiler==null) { log.info("獲取鎖失敗"); countDownLatch.countDown(); jedis.close(); return; } try{ String value = jedis.get(testResource); Thread.sleep(200); jedis.set(testResource,(Integer.parseInt(value)+1)+""); }catch (Exception e){ e.printStackTrace(); }finally { // 每次不釋放鎖,模擬執行超常任務或者進程掛掉的情形 //RedisSetnxLock.releaseLock(jedis,lockName,identifiler); jedis.close(); countDownLatch.countDown(); } } }); } countDownLatch.await(); log.info(jedisPool.getResource().get(testResource)); threadPoolExecutor.shutdown(); }}
執行結果同樣正確,只是執行時間變長了。
相關maven pom:
<!--jedis client--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency> <!--jedis連接池--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.13.1</version> </dependency>
參考書籍:《Redis實戰》,一本非常棒的書。