消息隊列:Rabbitmq如何保證不丟消息

2021-02-19 灰子學技術

背景介紹:

筆者最近研究了下rabbitmq,便很好奇它是怎麼保證不丟失消息的呢?於是便整理了這篇文章來跟大家分享下,自己的理解,如有不準確的地方或者不同的意見,還請各位能夠給出反饋,我們可以討論,相互學習,相互成長。

基礎知識:

在開始探討這個問題之前,筆者還是覺得很有必要將rabbitmq的架構等基礎知識回顧下,如下所示:


對於使用rabbitmq的服務來說,主要由三部分構成,它們分別是:生產者,rabbitmq,消費者。這三者之間是通過網絡來進行通訊的,其中與生產者對應的是exchange,與消費者對應的是connection,而rabbitmq內部又由exchange,queue,connection三部分構成。

消息的流程:消息是由生產者生產了之後,上報給exchange,exchange綁定並存儲到queue中,再傳遞給最終的消費者手裡。

如此以來,整個過程就分成了三大場景:

場景1: 生產者與exchange的上報消息,如何保證不丟失?

對於網絡通訊來說,解決丟數據最好的辦法就是,消息確認機制,而rabbitmq裡面是通過兩個方式來保證:一種是事務機制,這個是在amqp協議層面保證的,具體操作如下所示:

RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback(), txSelect用於將當前channel設置成transaction模式,txCommit用於提交事務,txRollback用於回滾事務,在通過txSelect開啟事務之後,我們便可以發布消息給broker代理伺服器了,如果txCommit提交成功了,則消息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由於其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。(備註:採用事務機制實現會降低RabbitMQ的消息吞吐量。)

步驟為:

1.client----發送>Tx.Select2.broker----發送>Tx.Select-Ok(之後publish)3.client-發送>Tx.Commit4.broker-發送---->Tx.Commit-Ok

一種是confrim機制:

原理:消息響應機制,

生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那麼確認消息會將消息寫入磁碟之後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號。

confrim的優勢是,它是異步的,在生產者發送完一個消息之後,不必要等這個消息的返回,就可以繼續處理另外一個消息,等待消息的ack返回之後,再去處理前面發過的消息,類似於多路復用的做法。rabbitmq在收到之後,會回復ack,如果因為rabbitmq自身的問題導致的,會回復nack消息。

對於生產者來說,為了方便確認消息有沒有真正到達rabbitmq端,還需要在生產者端設置超時重發,畢竟網絡裡面是可能丟失消息的。

confrim方式使用的API:

https://godoc.org/github.com/streadway/amqp#Channel.Confirm

場景2: 消費者從queue中獲取消息如何保證不丟失?

消息在到達了rabbitmq之後,會將數據保存到queue裡面,queue是存到內存裡面的,不過rabbitmq提供了持久化的操作,這個策略如下所示:

1.buffer大約1M左右,寫滿之後,就會寫到磁碟中。2.25ms超時時間,buffer不滿的話,超時也會寫到磁碟中。

儘管如此,也有可能會丟數據,特別是當rabbitmq在buffer沒有寫到磁碟的時候,就死掉了。不過rabbitmq也提供了鏡像隊列的方式,利用主備的方式來防止消息丟掉,不過當master和salve同時掛掉的話,還是會丟數據,只不過這種同時掛掉的概率會小很多。(筆者覺得,沒有百分之百的不丟消息,只是丟消息的概率變的很低而已。)

參考文章:https://blog.csdn.net/u013256816/article/details/60875666

場景3: rabbitmq內部如何保證不丟失消息?

對於消費者來說,同樣也是採用了消息響應的方式來防止消息不丟失,不過在這一層使用的是ack機制來處理,不過這裡的ack可以設置成不等待ack和等待ack兩種,在這裡我們使用的是設置ack。

消費者對於消息的響應通常有下面三個場景:

1.消費者在接收到rabbitmq的消息之後,等處理完消息之後,會主動回復ack消息,rabbitmq在收到ack之後,便會繼續給這個消費者分配下一個消息進行處理。

2.當然rabbitmq也可以回復unack消息,如此以來消息隊列下一次還會繼續將這個消息分配給消費者,來實現消息重處理。

3.消費者先把ack消息回復掉,然後在重新將這個消息放到rabbitmq之中,如此以來通過rabbitmq的隊列特性來實現,消息的重試,這裡的重試,不是一直處理這一個消息,而是要等到隊列裡面的消息排隊到它才行。

除此之外,rabbitmq還實現了批量的概念,通過qos來設置一次性分配給一個消費者的最大數量,讓消費者一次性消費一批消息,等到處理完了這一批消息,再去分配下一批消息給這一個消費者。

問題1:一旦消費者長時間不回復Ack消息或者消費者卡死了呢,這種場景如何處理?

理論上需要消費者需要實現一個超時處理機制,在一定時間內沒有處理完畢,需要超時回復ack或者unack消息給rabbitmq。

問題2:就算消費者有超時機制,可是一旦消費者在發送ack給rabbitmq的時候,消息丟失,rabbitmq這個消息一直收不到響應消息的話,會怎麼辦呢?

rabbitmq提供了一個可以針對消費者掛掉之後的處理機制,在消費者掛掉之後,會探測出來,然後進行後續處理。

rabbitmq還有一個ttl的功能,可以針對消息隊列或者單個消息設置對應的ttl值,一旦ttl超時,消息就會變為dead message, 不會再分配給消費者。在這裡我們可以採用這個策略,在消息變成死消息之後,我們可以讓生產者再次生產相同的消息放到rabbitmq當中,如果確定這個消息不在使用了,就直接丟棄這個消息。

可以參看:https://blog.csdn.net/u013256816/article/details/54916011

消費者ack相關api:

https://godoc.org/github.com/streadway/amqp#Channel.Ack

備註:如有不妥之處,歡迎大家指正,討論。

因為公眾號沒有評論功能,所以選了一個第三方小程序做評論入口,大家可以試用下,不好用的話,我直接移除,在研究其他評論方式。

留言評論

相關焦點

  • 詳解SpringCloud中RabbitMQ消息隊列原理及配置,一篇就夠!
    ,在rabbitmq中,存儲的消息可以是任意的java類型的對象。這種交換器會將接收到的消息發送給綁定的所有隊列中。當Producer發送消息到RabbitMQ時,交換器會將消息發送到已綁定的所有隊列中,這個過程交換器不會嘗試匹配路由鍵,所以消息中不需要提供路由鍵信息。Consumer仍舊註冊監聽器到隊列,監聽隊列狀態,當隊列狀態發生變化,消費消息。註冊監聽器需要提供交換器信息和隊列信息。
  • rabbitmq隊列之發送消息到指定隊列
    這裡是你的朝花夕拾、樂於分享動漫、生活小竅門、java程序小工具等給大家今天來分享一個:寫入消息到rabbitmq的java開發小工具類。話不多說,現在就上代碼:1、首先是mqConfig.properties配置文件,內容如下:#mq地址、帳戶密碼配置url=192.168.10.18
  • RabbitMQ是如何確定消息是否投遞到隊列中的
    前言在使用RabbitMQ消息中間件時,因為消息的投遞是異步的,默認情況下,RabbitMQ會刪除那些無法路由的消息。為了能夠檢出消息是否順利投遞到隊列,我們需要相應的處理機制。今天就來驗證一下相關的驗證機制。2. 消息投遞失敗那麼哪些情況消息會投遞失敗呢?
  • 常見Rabbitmq面試題及答案總結
    使用topic交換器時,可 以使用通配符9、 如何確保消息不丟失?MQ掛了,整套系統崩潰了,你不就完了麼。(2) 系統複雜性提高硬生生加個MQ進來,你怎麼保證消息沒有重複消費?怎麼處理消息丟失的情況? 怎麼保證消息傳遞的順序性?
  • RabbitMQ的5種核心消息模式都不懂,也敢說會用消息隊列
    5種消息模式這5種消息模式是構建基於RabbitMQ的消息應用的基礎,一定要牢牢掌握它們。學過RabbitMQ的朋友應該了解過這些消息模式的Java實現,這裡我們使用Spring AMQP的形式來實現它們。簡單模式簡單模式是最簡單的消息模式,它包含一個生產者、一個消費者和一個隊列。生產者向隊列裡發送消息,消費者從隊列中獲取消息並消費。
  • 輕量級消息隊列RedisQueue
    消息隊列(Message Queue)是分布式系統必不可少的中間件,大部分消息隊列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用於中小團隊,並且對.NET技術的支持力度不夠。而Redis實現的輕量級消息隊列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!
  • Spring全家桶、Dubbo、分布式、消息隊列後端必備全套開源項目
    《Spring Boot 消息隊列 RocketMQ 入門》 對應 lab-31《Spring Boot 消息隊列 Kafka 入門》 對應 lab-03-kafka《Spring Boot 消息隊列 RabbitMQ 入門》 對應
  • 在K8S上部署rabbitmq集群-有狀態服務
    按照傳統的方式,下單過程要等到調用完畢之後才能返回下單成功,如果網絡產生波動等原因使得商品服務扣庫存延遲或者失敗,會帶來較差的用戶體驗,如果在高並發的場景下,這樣的處理顯然是不合適的,那怎麼進行優化呢?這就需要消息隊列登場了。消息隊列提供一個異步通信機制,消息的發送者不必一直等待到消息被成功處理才返回,而是立即返回。
  • 《RabbitMQ》如何保證消息的可靠性
    :隊列的其他屬性參數,有如下可選項,可參看圖2的arguments:x-message-ttl:消息的過期時間,單位:毫秒;x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;x-max-length-bytes
  • 面試官再問我如何保證 RocketMQ 不丟失消息,這回我笑了!
    最近看了 @JavaGuide發布的一篇『面試官問我如何保證Kafka不丟失消息?我哭了!』,這篇文章承接這個主題,來聊聊如何保證 RocketMQ 不丟失消息。0x00.隨後 Broker 定期批量的將一組消息從內存異步刷入磁碟。這種方式減少 I/O 次數,可以取得更好的性能,但是如果發生機器掉電,異常宕機等情況,消息還未及時刷入磁碟,就會出現丟失消息的情況。若想保證 Broker 端不丟消息,保證消息的可靠性,我們需要將消息保存機制修改為同步刷盤方式,即消息存儲磁碟成功,才會返迴響應。
  • SpringBoot+RabbitMQ (保證消息100%投遞成功並被消費)
    三、項目介紹springboot版本2.1.5.RELEASE, 舊版本可能有些配置屬性不能使用, 需要以代碼形式進行配置RabbitConfig: rabbitmq相關配置TestServiceImpl: 生產者, 發送消息MailConsumer: 消費者, 消費消息, 發送郵件ResendMsg: 定時任務, 重新投遞發送失敗的消息說明
  • Springboot2.2.6構建RabbitMQ消息異常處理
    >大家好,我是技術人小Top今天咱們來介紹如何使用RabbitMQ構建消息異常處理 ^-^事後方便業務對帳通常消息發送和接收確認後,RabbitMQ中不再持久化消息了。這時一旦業務對帳出現了問題,消息不存在則無法追溯,無法對異常情況進行復位排差。
  • 淺入淺出消息隊列
    相信在學生時代大家都遇到過上面的這種情況,如果我們將在學校上課抽象成一個系統,那這種情況就是一個很常見的消息隊列的使用場景。在上述實例中,要提的問題就是**「消息」,提問題的學生是「生產者」,回答問題的老師是「消費者」,收集問題的課代表是「消息隊列」**。
  • 實戰|SpringBoot+RabbitMQ,保證消息100%投遞成功並被消費
    所以, 再貼一遍)1、驗證消息發送到Exchange失敗情況下的回調, 對應上圖P -> X如何驗證?, 對應上圖X -> Q同理, 修改一下路由鍵為不存在的即可, 路由失敗, 觸發回調發送失敗, 原因: route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE3、驗證在手動ack模式下, 消費端必須進行手動確認(ack), 否則消息會一直保存在隊列中, 直到被消費, 對應上圖Q
  • 如何使用Spring Boot與RabbitMQ結合實現延遲隊列
    比如消費者從隊列裡消費消息時失敗了,但是想要延遲一段時間後自動重試。如果不使用延遲隊列,那麼我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便於開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。如何實現?別急,在下文中,我們將詳細介紹如何利用 Spring Boot 加 RabbitMQ 來實現延遲隊列。
  • IM開發基礎知識補課(五):通俗易懂,正確理解並用好MQ消息隊列
    (一):保證在線實時消息的可靠投遞》《IM消息送達保證機制實現(二):保證離線消息的可靠投遞》《如何保證IM實時消息的「時序性」與「一致性」?》《IM群聊消息如此複雜,如何保證不丟不重?》《一種Android端IM智能心跳算法的設計與實現探討(含樣例代碼)》《移動端IM登錄時拉取數據如何作到省流量?》
  • 五分鐘學後端技術:如何學習後端工程師必學的消息隊列
    什麼是消息隊列「RabbitMQ?」「Kafka?」「RocketMQ?」...在日常學習與開發過程中,我們常常聽到消息隊列這個關鍵詞,可能你是熟練使用消息隊列的老手,又或者你是不懂消息隊列的新手,不論你了不了解消息隊列,本文都將帶你搞懂消息隊列的一些基本理論。
  • 10月阿里最新38道Java面試題解析(MyBatis+消息隊列+Redis)
    A 系統是重發還是先把消息保存起來呢?使用消息隊列就可以解決這個問題。A 系統只負責生產數據,不需要考慮消息被哪個系統來消費。A 系統需要發送個請求給 B 系統處理,由於 B 系統需要查詢資料庫花費時間較長,以至於 A 系統要等待 B 系統處理完畢後再發送下個請求,造成 A 系統資源浪費。使用消息隊列後,A 系統生產完消息後直接丟進消息隊列,不用等待 B 系統的結果,直接繼續去幹自己的事情了。
  • 如何理解Kafka的消息可靠性策略?
    遇到各種故障時,我的消息會不會丟?消費者側會收到多條消息嗎?消費者svr重啟後消息會丟失嗎?這些問題都很正常,在開始接觸和使用時總會有這樣或那樣的問題。 一般情況下,不做了解,使用各種默認的推薦值,也是可以work的。 但是我們要優雅的提升自己的姿(知)勢(識)。
  • 這些MQ概念你都懂嗎:死信隊列、重試隊列、消息回溯等
    消息隊列(MQ)的基本概念,很多時候都要了解清楚,這樣在學消息隊列中間件就比較能夠遊刃有餘,遇到不清楚的也可以重新翻來看看,加深理解。這裡有關於:優先級隊列、延遲隊列、死信隊列、重試隊列、消息回溯、消息堆積、消息追蹤/消息軌跡、消息過濾、消息審計、消息路由等的介紹。