如何使用Spring Boot與RabbitMQ結合實現延遲隊列

2020-12-13 CSDN

顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之後就會被消費者馬上消費。

延遲隊列能做什麼?

延遲隊列多用於需要延遲工作的場景。最常見的是以下兩種場景:

延遲消費。比如:

用戶生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。

用戶註冊成功之後,需要過一段時間比如一周後校驗用戶的使用情況,如果發現用戶活躍度較低,則發送郵件或者簡訊來提醒用戶使用。

延遲重試。比如消費者從隊列裡消費消息時失敗了,但是想要延遲一段時間後自動重試。

如果不使用延遲隊列,那麼我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便於開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。

如何實現?

別急,在下文中,我們將詳細介紹如何利用 Spring Boot 加 RabbitMQ 來實現延遲隊列。

實現思路

在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允許我們為消息或者隊列設置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在經過TTL秒後「死亡」,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。更多資料請查閱 官方文檔 。

Dead Letter Exchange

剛才提到了,被設置了TTL的消息在過期後會成為Dead Letter。其實在RabbitMQ中,一共有三種消息的「死亡」形式:

消息被拒絕。通過調用basic.reject或者basic.nack並且設置的requeue參數為false。

消息因為設置了TTL而過期。

消息進入了一條已經達到最大長度的隊列。

如果隊列設置了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱 官方文檔 。

流程圖

聰明的你肯定已經想到了,如何將RabbitMQ的TTL和DLX特性結合在一起,實現一個延遲隊列。

針對於上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:

延遲消費

延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產者產生的消息首先會進入緩衝隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之後,這些消息會通過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。

延遲重試

延遲重試本質上也是延遲消費的一種,但是這種模式的結構與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。

如下圖所示,消費者發現該消息處理出現了異常,比如是因為網絡波動引起的異常。那麼如果不等待一段時間,直接就重試的話,很可能會導致在這期間內一直無法成功,造成一定的資源浪費。那麼我們可以將其先放在緩衝隊列中(圖中紅色隊列),等消息經過一段的延遲時間後再次進入實際消費隊列中(圖中藍色隊列),此時由於已經過了「較長」的時間了,異常的一些波動通常已經恢復,這些消息可以被正常地消費。

如果你想學習Java工程化、高性能及分布式、高性能、深入淺出。性能調優、Spring,MyBatis,Netty源碼分析和大數據等知識點可以來找我。

而現在我就有一個平臺可以提供給你們學習,讓你在實踐中積累經驗掌握原理。主要方向是JAVA架構師。如果你想拿高薪,想突破瓶頸,想跟別人競爭能取得優勢的,想進BAT但是有擔心面試不過的,可以+我的Java架構進階群:554355695

代碼實現

接下來我們將介紹如何在Spring Boot中實現基於RabbitMQ的延遲隊列。我們假設讀者已經擁有了Spring Boot與RabbitMQ的基本知識。如果想快速了解Spring Boot的相關基礎知識,可以參考我之前寫的一篇文章。

初始化工程

首先我們在Intellij中創建一個Spring Boot工程,並且添加 spring-boot-starter-amqp 擴展。

配置隊列

從上述的流程圖中我們可以看到,一個延遲隊列的實現,需要一個緩衝隊列以及一個實際的消費隊列。又由於在RabbitMQ中,我們擁有兩種消息過期的配置方式,所以在代碼中,我們一共配置了三條隊列:

delay_queue_per_message_ttl:TTL配置在消息上的緩衝隊列。

delay_queue_per_queue_ttl:TTL配置在隊列上的緩衝隊列。

delay_process_queue:實際消費隊列。

我們通過Java Config的方式將上述的隊列配置為Bean。由於我們添加了 spring-boot-starter-amqp 擴展,Spring Boot在啟動時會根據我們的配置自動創建這些隊列。為了方便接下來的測試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個,且過期的消息都會通過DLX轉發到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

@Bean QueuedelayQueuePerMessageTTL(){ return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter發送到的exchange .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .build(); }

其中, x-dead-letter-exchange 聲明了隊列裡的死信轉發到的DLX名稱, x-dead-letter-routing-key 聲明了這些死信在轉發時攜帶的routing-key名稱。

delay_queue_per_queue_ttl

類似地,delay_queue_per_queue_ttl的配置代碼:

@Bean QueuedelayQueuePerQueueTTL(){ return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設置隊列的過期時間 .build(); }

delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個 x-message-ttl ,該配置用來設置隊列的過期時間。

delay_process_queue

delay_process_queue的配置最為簡單:

@Bean QueuedelayProcessQueue(){ return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build(); }

配置Exchange

配置DLX

首先,我們需要配置DLX,代碼如下:

@Bean DirectExchangedelayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); }

然後再將該DLX綁定到實際消費隊列即delay_process_queue上。這樣所有的死信都會通過DLX被轉發到delay_process_queue:

@Bean BindingdlxBinding(Queue delayProcessQueue, DirectExchange delayExchange){ return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME); }

配置延遲重試所需的Exchange

從延遲重試的流程圖中我們可以看到,消息處理失敗之後,我們需要將消息轉發到緩衝隊列,所以緩衝隊列也需要綁定一個Exchange。 在本例中,我們將delay_process_per_queue_ttl作為延遲重試裡的緩衝隊列 。具體代碼是如何配置的,這裡就不贅述了,大家可以查閱我 Github 中的代碼。

定義消費者

我們創建一個最簡單的消費者ProcessReceiver,這個消費者監聽delay_process_queue隊列,對於接受到的消息,他會:

如果消息裡的消息體不等於FAIL_MESSAGE,那麼他會輸出消息體。

如果消息裡的消息體恰好是FAIL_MESSAGE,那麼他會模擬拋出異常,然後將該消息重定向到緩衝隊列(對應延遲重試場景)。

另外,我們還需要新建一個監聽容器用於存放消費者,代碼如下:

@Bean SimpleMessageListenerContainerprocessContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監聽delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container; }

至此,我們前置的配置代碼已經全部編寫完成,接下來我們需要編寫測試用例來測試我們的延遲隊列。

編寫測試用例

延遲消費場景

首先我們編寫用於測試TTL設置在消息上的測試代碼。

我們藉助 spring-rabbit 包下提供的RabbitTemplate類來發送消息。由於我們添加了 spring-boot-starter-amqp 擴展,Spring Boot會在初始化時自動地將RabbitTemplate當成bean加載到容器中。

解決了消息的發送問題,那麼又該如何為每個消息設置TTL呢?這裡我們需要藉助MessagePostProcessor。MessagePostProcessor通常用來設置消息的Header以及消息的屬性。我們新建一個ExpirationMessagePostProcessor類來負責設置消息的TTL屬性:

/** * 設置消息的失效時間 */ public class ExpirationMessagePostProcessorimplements MessagePostProcessor{ private final Long ttl; // 毫秒 public ExpirationMessagePostProcessor(Long ttl){ this.ttl = ttl; } @Override public Message postProcessMessage(Message message)throws AmqpException { message.getMessageProperties() .setExpiration(ttl.toString()); // 設置per-message的失效時間 return message; } }

然後在調用RabbitTemplate的convertAndSend方法時,傳入ExpirationMessagePostPorcessor即可。我們向緩衝隊列中發送3條消息,過期時間依次為1秒,2秒和3秒。具體的代碼如下所示:

@Test public void testDelayQueuePerMessageTTL()throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) { long expiration = i * 1000; rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME, (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration)); } ProcessReceiver.latch.await(); }

細心的朋友一定會問,為什麼要在代碼中加一個CountDownLatch呢?這是因為如果沒有latch阻塞住測試方法的話,測試用例會直接結束,程序退出,我們就看不到消息被延遲消費的表現了。

那麼類似地,測試TTL設置在隊列上的代碼如下:

@Test public void testDelayQueuePerQueueTTL()throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) { rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME, "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION); } ProcessReceiver.latch.await(); }

我們向緩衝隊列中發送3條消息。理論上這3條消息會在4秒後同時過期。

延遲重試場景

我們同樣還需測試延遲重試場景。

@Test public void testFailMessage()throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(6); for (int i = 1; i <= 3; i++) { rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE); } ProcessReceiver.latch.await(); }

我們向delay_process_queue發送3條會觸發FAIL的消息,理論上這3條消息會在4秒後自動重試。

查看測試結果

延遲消費的場景測試我們分為了TTL設置在消息上和TTL設置在隊列上兩種。首先,我們先看一下TTL設置在消息上的測試結果:

從上圖中我們可以看到,ProcessReceiver分別經過1秒、2秒、3秒收到消息。測試結果表明消息不僅被延遲消費了,而且每條消息的延遲時間是可以被個性化設置的。TTL設置在消息上的延遲消費場景測試成功。

然後,TTL設置在隊列上的測試結果如下圖:

從上圖中我們可以看到,ProcessReceiver經過了4秒的延遲之後,同時收到了3條消息。測試結果表明消息不僅被延遲消費了,同時也證明了當TTL設置在隊列上的時候,消息的過期時間是固定的。TTL設置在隊列上的延遲消費場景測試成功。

接下來,我們再來看一下延遲重試的測試結果:

ProcessReceiver首先收到了3條會觸發FAIL的消息,然後將其移動到緩衝隊列之後,過了4秒,又收到了剛才的那3條消息。延遲重試場景測試成功。

相關焦點

  • 詳解SpringCloud中RabbitMQ消息隊列原理及配置,一篇就夠!
    這樣,如果微信通知不能正常使用,也不影響用戶下單,用戶下單後,只用把下單通知信息寫入消息隊列,不用關心後續操作,實現了訂單系統和通知系統的解耦。3、流量削峰一般在秒殺或者團購活動中使用。rabbitmq已經被spring-boot做了整合訪問實現。spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。
  • Java實現簡單延遲隊列和分布式延遲隊列
    在我們的工作中,很多地方使用延遲隊列,比如訂單到期沒有付款取消訂單,制訂一個提醒的任務等都需要延遲隊列,那麼我們需要實現延遲隊列。我們本文的梗概如下,同學們可以選擇性閱讀。1. 實現一個簡單的延遲隊列。
  • Springboot2.2.6構建RabbitMQ消息發布端代碼
    接續之前文章AMQP協議、模型及RabbitMQ常用組件消息中間件RabbitMQ、微服務,以及數據一致性問題消息中間件RabbitMQ,為什麼使用RabbitMQ以及它支持的場景大家好,我是技術人小Top今天咱們來介紹如何使用RabbitMQ構建消息發布端 ^-^
  • 消息隊列:Rabbitmq如何保證不丟消息
    confrim方式使用的API:https://godoc.org/github.com/streadway/amqp#Channel.Confirm場景2: 消費者從queue中獲取消息如何保證不丟失?
  • Spring集成RabbitMQ簡單實現RPC
    整合Rabbit MQ提供了Reply來實現RPC,AMQP協議定義了14中消息的屬性,其中兩項,一項是Replyto,表示返回消息的隊列,一個是correlationId 用來表示發送消息和返回消息的標誌,來區分是否是一個調用下面一步步來實現RPC首先貼出spring配置文件代碼
  • Springboot2.2.6構建RabbitMQ消息異常處理
    Springboot2.2.6構建RabbitMQ消息接收端代碼>大家好,我是技術人小Top今天咱們來介紹如何使用RabbitMQ構建消息異常處理 ^-^RabbitMQ官網:www.rabbitmq.com上次介紹了Spring2.26如何構建RabbitMQ消息接收端具體到應用開發,需要使用
  • 使用Spring Boot開發的10個免費開源項目
    該項目隨著Spring boot 2.0.5的發布而開發。原始碼 -  https://github.com/RameshMF/java-blog-aggregator-boot現場演示 -  https://www.topjavablogs.com/3.
  • 基於Spring Boot和Spring Cloud實現微服務架構
    Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • 基於 Spring Boot 和 Spring Cloud 實現微服務架構
    Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • 基於Spring Boot和Spring Cloud實現微服務架構學習
    Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。
  • 實戰Spring Boot 2.0系列:單機定時任務的幾種實現
    使用這種方式可以讓你的程序按照某一個 頻度執行,但不能在 指定時間 運行。現在一般用的較少。Quartz 功能強大,可以結合 資料庫 做 持久化,進行 分布式 的 任務延時調度。} dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") }}apply plugin: 'java'
  • 實現微服務架構最流行Style,Spring Boot+Spring Cloud
    ,具體來說當你使用maven dependency引入spring jar包時它就在工作了。Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。
  • Spring Boot 配置文件的多環境實現
    可以獲取spring.application.json 或者 SPRING_APPLICATION_JSON的參數作為spring boot參數一種方式可以設置系統參數放入到systemProperties環境中配置運行參數 。
  • Spring全家桶、Dubbo、分布式、消息隊列後端必備全套開源項目
    入門》 對應 lab-31《Spring Boot 消息隊列 Kafka 入門》 對應 lab-03-kafka《Spring Boot 消息隊列 RabbitMQ 入門》 對應 lab-04-rabbitmq《Spring Boot 消息隊列
  • Spring Boot 採用Sharding-JDBC 實現Mybaits的分庫分表功能
    理論上可支持任意實現JDBC規範的資料庫。目前支持MySQL,Oracle,SQLServer和PostgreSQL。Sharding-JDBC定位為輕量級java框架,使用客戶端直連資料庫,以jar包形式提供服務,未使用中間層,無需額外部署,無其他依賴,DBA也無需改變原有的運維方式。採用」半理解」理念的SQL解析引擎,以達到性能與兼容性的最大平衡。
  • Spring Boot 和 Spring 到底有啥區別?
    我們通常使用Spring Test,JUnit,Hamcrest和Mockito庫。在Spring項目中,我們應該將所有這些庫添加為依賴項。但是在Spring Boot中,我們只需要添加spring-boot-starter-test依賴項來自動包含這些庫。Spring Boot為不同的Spring模塊提供了許多依賴項。
  • Spring Boot中Tomcat、Jetty、Undertow的使用
    Tomcat在我們使用SpringBoot開發WebApi時,會引入spring-boot-starter-web這個starter組件,其自帶了Tomcat容器,所以我們平時新建項目啟動起來,會看見Tomcat相關的一些信息。
  • Spring Boot集成JDBCTemplate
    JDBC簡介Java資料庫連接,(Java Database Connectivity,簡稱JDBC)是Java語言中用來規範客戶端程序如何來訪問資料庫的應用程式接口,提供了諸如查詢和更新資料庫中數據的方法。它由一組用Java語言編寫的類和接口組成。通常說的JDBC是面向關係型資料庫的。
  • 一文搞懂如何在Spring Boot 正確中使用JPA
    一 JPA 基礎:常見操作1.相關依賴我們需要下面這些依賴支持我們完成這部分內容的學習: <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <
  • spring cloud如何使用RestTemplate和Ribbon相結合實現負載均衡
    前景提示:本文假設您已經使用maven構建了moudles工程,並新建了eureka-server和eureka-client兩個工程,實現了服務的註冊與發現。、Ribbon的起步依賴spring-cloud-starter-ribbon,以及web的起步依賴spring-boot-starter-web,代碼如下:在工程的配置文件application.yml做程序相關的配置,包括指定程序名為eureka-ribbon-client,程序的埠號為8764,服務的註冊地址為 http://localhost:8761/eureka/ ,代碼如下