《RabbitMQ》如何保證消息的可靠性

2020-12-08 cuixiaoyande

一條消費成功被消費經歷了生產者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。

一 消息生產者沒有把消息成功發送到MQ

1.1 事務機制

AMQP協議提供了事務機制,在投遞消息時開啟事務支持,如果消息投遞失敗,則回滾事務。

自定義事務管理器

@Configurationpublic class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); }}

修改yml

spring: rabbitmq: # 消息在未被隊列收到的情況下返回 publisher-returns: true

開啟事務支持

rabbitTemplate.setChannelTransacted(true);

消息未接收時調用ReturnCallback

rabbitTemplate.setMandatory(true);

生產者投遞消息

@Servicepublic class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 設置channel開啟事務 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("這條消息發送失敗了"+message+",請處理"); } @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager") public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); }}

但是,很少有人這麼幹,因為這是同步操作,一條消息發送之後會使發送端阻塞,以等待RabbitMQ-Server的回應,之後才能繼續發送下一條消息,生產者生產消息的吞吐量和性能都會大大降低。

1.2 發送方確認機制

發送消息時將信道設置為confirm模式,消息進入該信道後,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列後,RabbitMQ就會發送給生產者一個確認。

開啟消息確認機制

spring: rabbitmq: # 消息在未被隊列收到的情況下返回 publisher-returns: true # 開啟消息確認機制 publisher-confirm-type: correlated

消息未接收時調用ReturnCallback

rabbitTemplate.setMandatory(true);

生產者投遞消息

@Servicepublic class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("確認了這條消息:"+correlationData); }else{ System.out.println("確認失敗了:"+correlationData+";出現異常:"+cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("這條消息發送失敗了"+message+",請處理"); } public void publisMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); }}

如果消息確認失敗後,我們可以進行消息補償,也就是消息的重試機制。當未收到確認信息時進行消息的重新投遞。設置如下配置即可完成。

spring: rabbitmq: # 支持消息發送失敗後重返隊列 publisher-returns: true # 開啟消息確認機制 publisher-confirm-type: correlated listener: simple: retry: # 開啟重試 enabled: true # 最大重試次數 max-attempts: 5 # 重試時間間隔 initial-interval: 3000

二 消息發送到MQ後,MQ宕機導致內存中的消息丟失

消息在MQ中有可能發生丟失,這時候我們就需要將隊列和消息都進行持久化。

@Queue註解為我們提供了隊列相關的一些屬性,具體如下:

name: 隊列的名稱;durable: 是否持久化;exclusive: 是否獨享、排外的;autoDelete: 是否自動刪除;arguments:隊列的其他屬性參數,有如下可選項,可參看圖2的arguments:x-message-ttl:消息的過期時間,單位:毫秒;x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;x-max-length-bytes:隊列消息內容佔用最大空間,受限於內存大小,超過該閾值則從隊列頭部開始刪除消息;x-overflow:設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發生什麼。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發送到該交換器中;x-dead-letter-routing-key:死信消息路由鍵,在消息發送到死信交換器時會使用該路由鍵,如果不設置,則使用消息的原來的路由鍵值x-single-active-consumer:表示隊列是否是單一活動消費者,true時,註冊的消費組內只有一個消費者消費消息,其他被忽略,false時消息循環分發給所有消費者(默認false)x-max-priority:隊列要支持的最大優先級數;如果未設置,隊列將不支持消息優先級;x-queue-mode(Lazy mode):將隊列設置為延遲模式,在磁碟上保留儘可能多的消息,以減少RAM的使用;如果未設置,隊列將保留內存緩存以儘可能快地傳遞消息;x-queue-master-locator:在集群模式下設置鏡像隊列的主節點信息。持久化隊列

創建隊列的時候將持久化屬性durable設置為true,同時要將autoDelete設置為false

@Queue(value = "javatrip",durable = "false",autoDelete = "false")

持久化消息

發送消息的時候將消息的deliveryMode設置為2,在Spring Boot中消息默認就是持久化的。

三 消費者消費消息的時候,未消費完畢就出現了異常

消費者剛消費了消息,還沒有處理業務,結果發生異常。這時候就需要關閉自動確認,改為手動確認消息。

修改yml為手動籤收模式

spring: rabbitmq: listener: simple: # 手動籤收模式 acknowledge-mode: manual # 每次籤收一條消息 prefetch: 1

消費者手動籤收

@Component@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))public class Consumer { @RabbitHandler public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{ System.out.println(message); // 唯一的消息ID Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 確認該條消息 if(...){ channel.basicAck(deliverTag,false); }else{ // 消費失敗,消息重返隊列 channel.basicNack(deliverTag,false,true); } }}

四 總結

消息丟失的原因?

生產者、MQ、消費者都有可能造成消息丟失

如何保證消息的可靠性?

發送方採取發送者確認模式MQ進行隊列及消息的持久化消費者消費成功後手動確認消息

相關焦點

  • SpringBoot+RabbitMQ (保證消息100%投遞成功並被消費)
    spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# 開啟confirms回調 P -> Exchangespring.rabbitmq.publisher-confirms
  • 實戰|SpringBoot+RabbitMQ,保證消息100%投遞成功並被消費
    、郵箱配置# rabbitmqspring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# 開啟confirms回調 P -> Exchangespring.rabbitmq.publisher-confirms
  • rabbitmq功能介紹
    一個producer發布任務,一堆consumers接收到任務後進行處理。這種模式極為好用,但其結構卻很簡單。只要兩個信息:1)connection uri,2)queue的名字,然後啟動生產者和消費者就可以工作。 但是,本文想聊一聊的是rabbitmq所提供的更多功能。可以用它們來實現更多的模式,完成更複雜的功能。 首先,需要把rabbitmq的幾個基本概念說明一下。
  • RabbitMQ 3.0.3 發布,高級消息隊列服務
    基於Erlang的高級消息隊列RabbitMQ 3.0.3 發布。2013-03-06 之前版本是2013-01-31的3.0.2 主要是bug 修復。
  • 消息隊列:Rabbitmq如何保證不丟消息
    背景介紹:筆者最近研究了下rabbitmq,便很好奇它是怎麼保證不丟失消息的呢?
  • 想了解 Kafka, RabbitMQ, ZeroMQ, RocketMQ, ActiveMQ 之間的差異?這一篇文章就夠了!
    activemq:內存、磁碟、資料庫。支持少量堆積。Kafka:支持rabbitmq:支持。客戶端將信道設置為事務模式,只有當消息被rabbitMq接收,事務才能提交成功,否則在捕獲異常後進行回滾。使用事務會使得性能有所下降zeromq:不支持rocketmq:支持activemq:支持Kafka:支持負載均衡。
  • RabbitMQ 3.3.5 發布,AMQP 消息伺服器
    基於Erlang的高級消息隊列RabbitMQ 3.3.5發布。2014-08-11。
  • RabbitMQ 消費端限流、TTL、死信隊列
    為什麼要對消費端限流假設一個場景,首先,我們 Rabbitmq 伺服器積壓了有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現這樣情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多數據!
  • RabbitMQ 3.7.8-rc.4 發布,多協議消息代理
    Priority queues no longer fail with an exception when used together with other rabbit_backing_queue behaviour implementations.
  • mac如何安裝和啟動RabbitMQ
    1.通過brew指令執行安裝rabbitmq的命令,如下:brew install rabbitmq【小知識】Brew又叫Homebrew,是MAC中的一款軟體包管理工具,通過brew可以很方便的在MAC中安裝軟體或者是卸載軟體。
  • 想了解Kafka,RabbitMQ,ZeroMQ,RocketMQ,ActiveMQ之間的差異?這一篇文章就夠了!
    activemq:內存、磁碟、資料庫。支持少量堆積。Kafka:支持rabbitmq:支持。客戶端將信道設置為事務模式,只有當消息被rabbitMq接收,事務才能提交成功,否則在捕獲異常後進行回滾。使用事務會使得性能有所下降zeromq:不支持rocketmq:支持activemq:支持Kafka:支持負載均衡。
  • 阿里P8精心整理MongoDB+RabbitMQ+Memcached面試題,100%拿offer
    Memcached 23道面試題答案第二版: RabbitMQ 12道1、什麼是rabbitmq2、為什麼要使用rabbitmq-3、使用rabbitmq的場景4、如何確保消息正確地發送至RabbitMQ?
  • RabbitMQ 3.6.16 和 3.7.6 發布,多協議消息代理
    GitHub issue: rabbitmq-server#1519Maximum supported number of queue priorities (255) is now enforced.
  • 熬夜整理的RabbitMQ知識點相當齊全的文章
    、高可用性等方面表現還挺不俗,具體特點有:(1)可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、消費確認、發布確認;(2)靈活的路由:在消息進入隊列之前,通過交換器Exchange 來路由消息的;對於典型的路由功能,RabbitMQ 已經提供了一些內置Exchange來實現;針對更複雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange;(3)
  • RabbitMQ 3.8.8 發布,引入維護模式
    關閉所有現有的客戶端連接:應用程式應重新連接到其它節點並恢復 所有經典鏡像隊列的主副本託管在傳輸目標節點上 所有仲裁隊列的主要副本託管在傳輸目標節點上,並阻止它們參與隨後觸發的 Raft 選舉 將節點標記為關閉以進行維護 此時,由於節點已經轉移了大部分職責,因此節點關閉的破壞性將最小不管隊列類型和使用的隊列主定位器策略如何
  • 詳解SpringCloud中RabbitMQ消息隊列原理及配置,一篇就夠!
    # rabbitmq安裝位置spring.rabbitmq.host=localhost# rabbitmq的埠spring.rabbitmq.port=5672# rabbitmq的用戶名spring.rabbitmq.username=test
  • 消息中間件你該了解的秘密
    在使用消息中間件的過程中我們需要了解以下場景:如何與我們的開發框架SpringBoot進行集成如何發送消息如何發送複雜消息如何保證發送消息的可靠性如何消費消息如何保證消費消息的可靠性如何保證消費者的可擴展性如何使用消費者進行流量削峰以這些場景為基礎開啟本文的寫作,本文是消息中間件RabbitMQ為例2.
  • RabbitMQ 消息隊列工作原理全揭秘!(附MQ官方中文文檔下載方式!)
    RabbitMQ的官網是http://www.rabbitmq.com2. 應用場景言歸正傳。RabbitMQ,或者說AMQP解決了什麼問題,或者說它的應用場景是什麼?2)如何降低發送者和接收者的耦合度?3)如何讓Priority高的接收者先接到數據?4)如何做到load balance?有效均衡接收者的負載?
  • RabbitMQ 3.7.8 RC3 發布,Erlang 的 AMQP 開源實現
    rabbitmq-echopid.bat nowloads rabbitmq-env.bat correctly.rabbitmqadmin now supports a new argument, --request-timeout.Switching sections will now scroll to the top of the newly rendered page.
  • RabbitMQ 高頻考點
    系統複雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重複消費,如何保證保證消息可靠傳輸。因此需要考慮的東西更多,系統複雜性增大。2 常見的 MQ消息中間件具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。