接續之前文章
Springboot2.2.6構建RabbitMQ消息接收端代碼
Springboot2.2.6構建RabbitMQ消息發布端代碼
大家好,我是技術人小Top
今天咱們來介紹如何使用RabbitMQ構建消息異常處理 ^-^
上次介紹了Spring2.26如何構建RabbitMQ消息接收端
具體到應用開發,需要使用RabbitMQ API來實現具體業務場景
現在開始進入實戰
01消息是否落地保存
建議:落地保存
1、落地保存的好處
消息保存在資料庫中方便人工和程序處理雖然RabbitMQ具備消息持久化的特性,但是它畢竟不是存儲設備。相比較資料庫而言,數據的可讀性、易用性和穩定性都要遜色。
事後方便業務對帳通常消息發送和接收確認後,RabbitMQ中不再持久化消息了。這時一旦業務對帳出現了問題,消息不存在則無法追溯,無法對異常情況進行復位排差。
2、落地保存的具體位置/時機
詳見本文開頭的文章
發送消息到Exchange成功時發送消息到Exchange失敗時發送消息到Queue失敗時接收消息成功時接收消息失敗時
02消息是否設置過期
根據實際需要來決定
1、設置過期時間的好處
消息及時釋放/刪除會減輕RabbitMQ對磁碟佔用的壓力從業務角度來說,消息的持久化畢竟是短暫的或瞬態的,不需要像資料庫那樣永久保存數據
即使出現異常也不會對RabbitMQ造成磁碟壓力只要配合消息落地,就算有異常產生,在設計層面可以依賴資料庫保證數據的永久可追溯
2、設置過期時間的實現
對Queue或Message都可設置過期時間大多數場景會對Queue設置過期時間
當同時設置過期時間時,以最小的過期時間為準
03消息是否設置死信隊列
根據實際需要來決定
1、設置死信隊列的好處
保證消費的隊列沒有垃圾數據由於通常消息是持久化的,原隊列也就是業務隊列中的數據會不斷進入新的消息。這時如果存在異常消息(也就是消費端處理過程中發生異常而無法處理)長時間存放在原隊列是沒有意義的。因為這通常需要消費端排查問題並修復代碼。
保證消費的隊列高效運行由於沒有死信(處理不了的異常消息),隊列消費的速度會很高效。反之,如果重複入隊,效率會降低。如果不重複入隊,需要有追溯機制來跟進和解決異常問題。
便於保存異常消息並定位問題2、設置死信隊列的實現
之前的AMQP協議、模型及RabbitMQ常用組件文章中發過這樣一張圖,說明了AMQP的工作原理
在此圖基礎上,我們把死信交換機和死信隊列加上,一張圖就能看明白了(請關注紅色箭頭)
1、當Consumer1從Queue(user1)中接收消息失敗時,調用API告訴RabbitMQ拋棄錯誤消息
2、由於Queue(user1)設置了死信交換機,被拋棄的消息成為死信進入死信交換機user1DLXExchange
3、由於Queue(user1DLX)綁定了死信交換機,死信隊列會接收到被拋棄的消息
我們將之前文章中的代碼稍加改造就可以了
04小結
今天主要介紹了如何構建RabbitMQ異常處理
小夥伴們都了解了嗎?
下次小Top將繼續介紹RabbitMQ開發
對於今天的內容有任何疑問或問題,歡迎留言或討論 ^-^
05本文涉及的代碼
/**
* 死信交換機user1名稱
*/
public static final String MQ_DLX_EXCHANGE_USER1 = "user1DLXExchange";
/**
* 死信交換機user2名稱
*/
public static final String MQ_DLX_EXCHANGE_USER2 = "user2DLXExchange";
/**
* 死信隊列user1名稱
*/
public static final String MQ_DLX_QUEUE_USER1 = "user1DLX";
/**
* 死信隊列user2名稱
*/
public static final String MQ_DLX_QUEUE_USER2 = "user2DLX";
@Bean
public Declarables declarables() {
//Todo: user1指定死信交換機deadLetterExchange
Queue userQueue1 = QueueBuilder
.durable(MQ_QUEUE_USER1)
.deadLetterExchange(MQ_DLX_EXCHANGE_USER1)
.build();
//Todo: user2指定死信交換機deadLetterExchange
Queue userQueue2 = QueueBuilder
.durable(MQ_QUEUE_USER2)
.deadLetterExchange(MQ_DLX_EXCHANGE_USER2)
.build();
FanoutExchange fanoutExchange = ExchangeBuilder.
fanoutExchange(MQ_FANOUT_EXCHANGE_USER)
.durable(true)
.build();
DirectExchange directExchange = ExchangeBuilder
.directExchange(MQ_FANOUT_EXCHANGE_TEST)
.durable(true)
.build();
//Todo: 定義死信隊列
Queue userDLXQueue1 = QueueBuilder
.durable(MQ_DLX_QUEUE_USER1)
.ttl(300000)
.build();
Queue userDLXQueue2 = QueueBuilder
.durable(MQ_DLX_QUEUE_USER2)
.ttl(300000)
.build();
//Todo: 定義死信隊列交換機
FanoutExchange dlxUser1Exchange = ExchangeBuilder
.fanoutExchange(MQ_DLX_EXCHANGE_USER1)
.durable(true)
.build();
FanoutExchange dlxUser2Exchange = ExchangeBuilder
.fanoutExchange(MQ_DLX_EXCHANGE_USER2)
.durable(true)
.build();
return new Declarables(userQueue1, userQueue2, fanoutExchange, directExchange,
userDLXQueue1, userDLXQueue2, dlxUser1Exchange, dlxUser2Exchange,
BindingBuilder.bind(userQueue1).to(fanoutExchange),
BindingBuilder.bind(userQueue2).to(fanoutExchange),
BindingBuilder.bind(userDLXQueue1).to(dlxUser1Exchange),
BindingBuilder.bind(userDLXQueue2).to(dlxUser2Exchange));
}