在我們的工作中,很多地方使用延遲隊列,比如訂單到期沒有付款取消訂單,制訂一個提醒的任務等都需要延遲隊列,那麼我們需要實現延遲隊列。我們本文的梗概如下,同學們可以選擇性閱讀。
1. 實現一個簡單的延遲隊列。
我們知道目前JAVA可以有DelayedQueue,我們首先開一個DelayQueue的結構類圖。DelayQueue實現了Delay、BlockingQueue接口。也就是DelayQueue是一種阻塞隊列。
我們在看一下Delay的類圖。Delayed接口也實現了Comparable接口,也就是我們使用Delayed的時候需要實現CompareTo方法。因為隊列中的數據需要排一下先後,根據我們自己的實現。Delayed接口裡邊有一個方法就是getDelay方法,用於獲取延遲時間,判斷是否時間已經到了延遲的時間,如果到了延遲的時間就可以從隊列裡邊獲取了。
我們創建一個Message類,實現了Delayed接口,我們主要把getDelay和compareTo進行實現。在Message的構造方法的地方傳入延遲的時間,單位是毫秒,計算好觸發時間fireTime。同時按照延遲時間的升序進行排序。我重寫了裡邊的toString方法,用於將Message按照我寫的方法進行輸出。
裡邊的main方法裡邊聲明了兩個Message,一個延遲5秒,一個延遲7秒,時間到了之後會將接取出並且列印。輸出的結果如下,正是我們所期望的。
這個方法實現起來真的非常簡單。但是缺點也是很明顯的,就是數據在內存裡邊,數據比較容易丟失。那麼我們需要採用Redis實現分布式的任務處理。
2. 使用Redis的list實現分布式延遲隊列。
本地需要安裝一個Redis,我自己是使用Docker構建一個Redis,非常快速,命令也沒多少。我們直接啟動Redis並且暴露6379埠。進入之後直接使用客戶端命令即可查看和調試數據。
我本地採用spring-boot的方式連接redis,pom文件列一下,供大家參考。
加上Redis的配置放到application.properties裡邊即可實現Redis連接,非常的方便。
接下來實現一個基於Redis的list數據類型進行實現的一個類。我們使用RedisTemplate操作Redis,這個裡邊封裝好我們所需要的Redis的一些方法,用起來非常方便。這個類允許延遲任務做多有10W個,也是避免數據量過大對Redis造成影響。如果在線上使用的時候也需要考慮延遲任務的多少。太多幾百萬幾千萬的時候可能數據量非常大,我們需要計算Redis的空間是否夠。這個代碼也是非常的簡單,一個用於存放需要延遲的消息,採用offer的方法。另外一個是啟動一個線程, 如果消息時間到了,那麼就將數據lpush到Redis裡邊。
接下來我們看一下,我們寫一個測試的controller。大家看一下這個請求/redis/listDelayedQueue的代碼位置。我們也是生成了兩個消息,然後把消息放到隊列裡邊,另外我們在啟動一個線程任務,用於將數據從Redis的list中獲取。方法也非常簡單。
我就不把運行結果寫出來了,感興趣的同學自己自行試驗。當然這個方法也是從內存中拿出數據,到時間之後放到Redis裡邊,還是會存在程序啟動的時候,任務進行丟失。我們繼續看另外一種方法更好地進行這個問題的處理。
3. 使用Redis的zSet實現分布式延遲隊列。
我們需要再寫一個ZSet的隊列處理。下邊的offerMessage主要是把消息直接放入緩存中。採用Redis的ZSET的zadd方法。zadd(key, value, score) 即將key=value的數據賦予一個score, 放入緩存中。score就是計算出來延遲的毫秒數。
上邊的Controller方法已經寫好了測試的方法。/redis/zSetDelayedQueue,裡邊主要使用ZSet的zRangeByScore(key, min, max)。主要是從score從0,當前時間的毫秒數獲取。取出數據後再採用removeRangeByScore,將數據刪除。這樣數據可以直接寫到Redis裡邊,然後取出數據後直接處理。這種方法比前邊的方法稍微好一些,但是實際上還存在一些問題,因為依賴Redis,如果Redis內存不足或者連不上的時候,系統將變得不可用。
4. 總結一下,另外還有哪些可以延遲隊列。
上面的方法其實還是存在問題的,比如系統重啟的時候還是會造成任務的丟失。所以我們在生產上使用的時候,我們還需要將任務保存起來,比如放到資料庫和文件存儲系統將數據存儲起來,這樣做到double-check,雙重檢查,最終達到任務的99.999%能夠處理。
其實還有很多東西可以實現延遲隊列。
1) RabbitMQ就可以實現此功能。這個消息隊列可以把數據保存起來並且進行處理。
2)Kafka也可以實現這個功能。
3)Netty的HashedWheelTimer也可以實現這個功能。
有興趣的同學可以進一步研究這些內容的實現。
歡迎大家留言評論,做進一步探討!