Java實現簡單延遲隊列和分布式延遲隊列

2020-12-20 計算機java編程

在我們的工作中,很多地方使用延遲隊列,比如訂單到期沒有付款取消訂單,制訂一個提醒的任務等都需要延遲隊列,那麼我們需要實現延遲隊列。我們本文的梗概如下,同學們可以選擇性閱讀。

1. 實現一個簡單的延遲隊列。

我們知道目前JAVA可以有DelayedQueue,我們首先開一個DelayQueue的結構類圖。DelayQueue實現了Delay、BlockingQueue接口。也就是DelayQueue是一種阻塞隊列。

我們在看一下Delay的類圖。Delayed接口也實現了Comparable接口,也就是我們使用Delayed的時候需要實現CompareTo方法。因為隊列中的數據需要排一下先後,根據我們自己的實現。Delayed接口裡邊有一個方法就是getDelay方法,用於獲取延遲時間,判斷是否時間已經到了延遲的時間,如果到了延遲的時間就可以從隊列裡邊獲取了。

我們創建一個Message類,實現了Delayed接口,我們主要把getDelay和compareTo進行實現。在Message的構造方法的地方傳入延遲的時間,單位是毫秒,計算好觸發時間fireTime。同時按照延遲時間的升序進行排序。我重寫了裡邊的toString方法,用於將Message按照我寫的方法進行輸出。

裡邊的main方法裡邊聲明了兩個Message,一個延遲5秒,一個延遲7秒,時間到了之後會將接取出並且列印。輸出的結果如下,正是我們所期望的。

這個方法實現起來真的非常簡單。但是缺點也是很明顯的,就是數據在內存裡邊,數據比較容易丟失。那麼我們需要採用Redis實現分布式的任務處理。

2. 使用Redis的list實現分布式延遲隊列。

本地需要安裝一個Redis,我自己是使用Docker構建一個Redis,非常快速,命令也沒多少。我們直接啟動Redis並且暴露6379埠。進入之後直接使用客戶端命令即可查看和調試數據。

我本地採用spring-boot的方式連接redis,pom文件列一下,供大家參考。

加上Redis的配置放到application.properties裡邊即可實現Redis連接,非常的方便。

接下來實現一個基於Redis的list數據類型進行實現的一個類。我們使用RedisTemplate操作Redis,這個裡邊封裝好我們所需要的Redis的一些方法,用起來非常方便。這個類允許延遲任務做多有10W個,也是避免數據量過大對Redis造成影響。如果在線上使用的時候也需要考慮延遲任務的多少。太多幾百萬幾千萬的時候可能數據量非常大,我們需要計算Redis的空間是否夠。這個代碼也是非常的簡單,一個用於存放需要延遲的消息,採用offer的方法。另外一個是啟動一個線程, 如果消息時間到了,那麼就將數據lpush到Redis裡邊。

接下來我們看一下,我們寫一個測試的controller。大家看一下這個請求/redis/listDelayedQueue的代碼位置。我們也是生成了兩個消息,然後把消息放到隊列裡邊,另外我們在啟動一個線程任務,用於將數據從Redis的list中獲取。方法也非常簡單。

我就不把運行結果寫出來了,感興趣的同學自己自行試驗。當然這個方法也是從內存中拿出數據,到時間之後放到Redis裡邊,還是會存在程序啟動的時候,任務進行丟失。我們繼續看另外一種方法更好地進行這個問題的處理。

3. 使用Redis的zSet實現分布式延遲隊列。

我們需要再寫一個ZSet的隊列處理。下邊的offerMessage主要是把消息直接放入緩存中。採用Redis的ZSET的zadd方法。zadd(key, value, score) 即將key=value的數據賦予一個score, 放入緩存中。score就是計算出來延遲的毫秒數。

上邊的Controller方法已經寫好了測試的方法。/redis/zSetDelayedQueue,裡邊主要使用ZSet的zRangeByScore(key, min, max)。主要是從score從0,當前時間的毫秒數獲取。取出數據後再採用removeRangeByScore,將數據刪除。這樣數據可以直接寫到Redis裡邊,然後取出數據後直接處理。這種方法比前邊的方法稍微好一些,但是實際上還存在一些問題,因為依賴Redis,如果Redis內存不足或者連不上的時候,系統將變得不可用。

4. 總結一下,另外還有哪些可以延遲隊列。

上面的方法其實還是存在問題的,比如系統重啟的時候還是會造成任務的丟失。所以我們在生產上使用的時候,我們還需要將任務保存起來,比如放到資料庫和文件存儲系統將數據存儲起來,這樣做到double-check,雙重檢查,最終達到任務的99.999%能夠處理。

其實還有很多東西可以實現延遲隊列。

1) RabbitMQ就可以實現此功能。這個消息隊列可以把數據保存起來並且進行處理。

2)Kafka也可以實現這個功能。

3)Netty的HashedWheelTimer也可以實現這個功能。

有興趣的同學可以進一步研究這些內容的實現。

歡迎大家留言評論,做進一步探討!

相關焦點

  • 如何使用Spring Boot與RabbitMQ結合實現延遲隊列
    延遲重試。比如消費者從隊列裡消費消息時失敗了,但是想要延遲一段時間後自動重試。如果不使用延遲隊列,那麼我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便於開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。如何實現?
  • 五分鐘學後端技術:如何學習後端工程師必學的消息隊列
    此特性使得RabbitMQ易於使用和部署,但是使得其運行速度較慢,因為中央節點增加了延遲,消息封裝後也比較大;需要學習比較複雜的接口和協議,學習和維護成本較高;ActiveMQActiveMQ是由Apache出品,ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。
  • 這些MQ概念你都懂嗎:死信隊列、重試隊列、消息回溯等
    02.延遲隊列當你在網上購物的時候是否會遇到這樣的提示:「三十分鐘之內未付款,訂單自動取消」?這個是延遲隊列的一種典型應用場景。延遲隊列存儲的是對應的延遲消息,所謂「延遲消息」是指當消息被發送以後,並不想讓消費者立刻拿到消息,而是等待特定時間後,消費者才能拿到這個消息進行消費。延遲隊列一般分為兩種:基於消息的延遲和基於隊列的延遲。
  • 輕量級消息隊列RedisQueue
    消息隊列(Message Queue)是分布式系統必不可少的中間件,大部分消息隊列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用於中小團隊,並且對.NET技術的支持力度不夠。而Redis實現的輕量級消息隊列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!
  • java中的Queue隊列的用法
    大家好,歡迎來到雄雄的小課堂,今天給大家分享的是「java中的Queue隊列的用法」 前言:好多人對Queue不是很熟悉,畢竟平時也不怎麼用,遇到集合要麼List要麼map這些常用的,殊不知,java中還有個Queue,今天,我們就來看看Queue的用法。
  • Java中常用隊列的總結
    隊列是一種先進先出(FIFO)的抽象數據結構,在Java中,隊列使用了兩種數據類型來實現的,分別是:數組和鍊表這兩種數據結構。本文主要內容:回顧Java中常用的七個阻塞隊列進行總結及阻塞隊列中四組AP並進行總結。本文來源:本文是由凱哥Java(kaigejava)原創發布。
  • 用兩個棧實現隊列(劍指 Offer 題解Java版)
    Push 和 Pop 操作。當元素要出棧時,需要先進入out棧,此時元素出棧順序再一次被反轉,因此出棧順序就和最開始入棧順序是相同的,先進入的元素先退出,這就是隊列的順序。分析棧的特點是先進後出,而隊列的特點是先進先出,主要就是在兩個棧中來回倒騰從而實現隊列的功能,就好像一個黑盒子,裡邊是兩個棧的操作,但其他人在用這個黑盒子的時候,感覺就像在用隊列一樣。隊列和棧只是邏輯性的數據結構,實現隊列和棧可以用數組實現,也可以用鍊表實現,只要滿足隊列先進先出,棧先進後出的特性就可以。
  • 為了互動直播,如何讓直播技術實現低延遲?
    而要獲得互動直播技術,實現低延遲是必須的。因此低延遲很重要。那麼,直播技術如何實現低延遲呢? 請允許我根據即構科技直播技術的經驗,和各位分享一下如何實現低延遲。 即構科技的連麥互動直播技術,連麥方的延遲400毫秒,觀看方的延遲1秒左右。目前映客直播,花椒直播,一直播和慄子直播都採用了即構科技的連麥互動直播技術。
  • 阻塞隊列實現生產者消費者以及同步工具類
    阻塞隊列阻塞隊列提供可阻塞的put和take方法,支持定時的offer和poll方法,如果隊列已經滿了,那麼put方法將阻塞直到有空間可用;如果隊列為空,那麼take方法將會阻塞直到有元素可用;同時隊列可以是有界也可以是無界的,無界隊列永遠都不會充滿,因此無界隊列的put方法永遠不會阻塞;
  • Java中常用的七個阻塞隊列第二篇DelayQueue源碼介紹
    本文出自凱哥Java(kaigejava)的《凱哥Java並發系列》之《Java並發編程之隊列》系列的第三篇:《Java中常用的七個阻塞隊列第二篇DelayQueue源碼介紹》Java中常用的幾個隊列中,阻塞隊列還有四個沒介紹。
  • java基礎容器學習之雙端隊列ArrayDeque
    引言:ArrayDeque,被稱為「雙端隊列」,可以從兩端進行插入或刪除操作,當需要使用棧時,Java已不推薦使用Stack,而是推薦使用更高效的ArrayDeque,當需要使用隊列時也可以使用ArrayDeque。
  • Java實現單鍊表、棧、隊列三種數據結構
    它裡面的數據元素是以結點為單位,每個結點是由數據元素的數據和下一個結點的地址組成,在java集合框架裡面  LinkedList、HashMap(數組加鍊表)等等的底層都是用鍊表實現的。三、隊列(Queue)1、隊列的特點也用「先進先出」四個字來概括。就是先進去的元素先輸出出來。
  • rabbitmq隊列之發送消息到指定隊列
    這裡是你的朝花夕拾、樂於分享動漫、生活小竅門、java程序小工具等給大家今天來分享一個:寫入消息到rabbitmq的java開發小工具類。(筆者已經製作成集MQ發送和接收並生成日誌文件的腳本小工具,需要的可以私聊我,我看到的話會第一時間回復給你。)
  • java並發編程之深入學習Concurrent包(十三,雙端阻塞隊列)
    引言:春節疫情的關係,宅在家,有空學習一下java並發中的內容,順便發出來請大家一起瞅瞅。上一章學習了阻塞隊列,這次一起學習下雙端阻塞隊列。LinkedBlockingDeque簡介:LinkedBlockingDeque是雙向鍊表實現的雙端阻塞隊列。該阻塞隊列可以從隊列的頭和尾進行插入和刪除,且該阻塞隊列是線程安全的。
  • Spring全家桶、Dubbo、分布式、消息隊列後端必備全套開源項目
    《Spring Boot 消息隊列 RocketMQ 入門》 對應 lab-31《Spring Boot 消息隊列 Kafka 入門》 對應 lab-03-kafka《Spring Boot 消息隊列 RabbitMQ 入門》 對應
  • 淺入淺出消息隊列
    看到「生產者」和「消費者」,不知道有沒有想起來在 RPC 的中同樣也有生產者和消費者,那麼這兩者之間有關聯嗎?這樣就實現了各個系統間的解耦,同時可以把失敗策略、重試等作為一個機制,對各個應用透明,直接在 MQ 與各調用方的應用接口層面實現即可,如下圖所示:
  • 程式設計師經典面試題,消息隊列的作用,你能說出幾個?
    今天我們來聊一聊消息隊列的作用。消息隊列,相信大家都不陌生,Kafka、RMQ都是大家常用的隊列,也是程式設計師面試中的一個常見的題目。進行削峰,減少並發數據在後臺各個系統中流轉就跟流水線上的工人一樣,如果前面的工人幹得非常快,那麼工作就會不停地堆積,很多零件就堆積著等著下面的工人解決。
  • Redis實現分布式阻塞隊列
    Redis分布式鎖實現原理分布式鎖本質上要實現的目標就是在 Redis 裡面佔一個「茅坑」,當別的進程也要來佔時,發現已經有人蹲在那裡了,就只好放棄或者稍後再試。佔坑一般是使用 setnx(set if not exists) 指令,只允許被一個客戶端佔坑。先來先佔, 用完了,再調用 del 指令釋放茅坑。
  • linux kernel工作隊列及源碼詳細講解
    重新定義工作結構參數#define PREPARE_WORK(_work, _func, _data)do {(_work)->func = _func;(_work)->data = _data;} while (0)/** initialize all of a work-struct:*/// 初始化工作結構, 和_
  • 帶UI界面的代碼統計小工具--進程、隊列的並發應用
    遍歷目錄下的所有文件,將所有python,java,C文件路徑放到隊列裡。2.    創建多個進程,從隊列中獲取文件路徑,分別統計各類型文件的代碼行數。3.    將統計的結果,存放到一個變量裡,並且實現多進程間變量共享。4.    製作一個圖形界面,用於選擇文件夾和統計代碼,並展示統計結果。