背景介紹:
筆者最近研究了下rabbitmq,便很好奇它是怎麼保證不丟失消息的呢?於是便整理了這篇文章來跟大家分享下,自己的理解,如有不準確的地方或者不同的意見,還請各位能夠給出反饋,我們可以討論,相互學習,相互成長。
基礎知識:
在開始探討這個問題之前,筆者還是覺得很有必要將rabbitmq的架構等基礎知識回顧下,如下所示:
對於使用rabbitmq的服務來說,主要由三部分構成,它們分別是:生產者,rabbitmq,消費者。這三者之間是通過網絡來進行通訊的,其中與生產者對應的是exchange,與消費者對應的是connection,而rabbitmq內部又由exchange,queue,connection三部分構成。
消息的流程:消息是由生產者生產了之後,上報給exchange,exchange綁定並存儲到queue中,再傳遞給最終的消費者手裡。
如此以來,整個過程就分成了三大場景:
場景1: 生產者與exchange的上報消息,如何保證不丟失?
對於網絡通訊來說,解決丟數據最好的辦法就是,消息確認機制,而rabbitmq裡面是通過兩個方式來保證:一種是事務機制,這個是在amqp協議層面保證的,具體操作如下所示:
RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback(), txSelect用於將當前channel設置成transaction模式,txCommit用於提交事務,txRollback用於回滾事務,在通過txSelect開啟事務之後,我們便可以發布消息給broker代理伺服器了,如果txCommit提交成功了,則消息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由於其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。(備註:採用事務機制實現會降低RabbitMQ的消息吞吐量。)
步驟為:
1.client----發送>Tx.Select2.broker----發送>Tx.Select-Ok(之後publish)3.client-發送>Tx.Commit4.broker-發送---->Tx.Commit-Ok一種是confrim機制:
原理:消息響應機制,
生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那麼確認消息會將消息寫入磁碟之後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號。
confrim的優勢是,它是異步的,在生產者發送完一個消息之後,不必要等這個消息的返回,就可以繼續處理另外一個消息,等待消息的ack返回之後,再去處理前面發過的消息,類似於多路復用的做法。rabbitmq在收到之後,會回復ack,如果因為rabbitmq自身的問題導致的,會回復nack消息。
對於生產者來說,為了方便確認消息有沒有真正到達rabbitmq端,還需要在生產者端設置超時重發,畢竟網絡裡面是可能丟失消息的。
confrim方式使用的API:
https://godoc.org/github.com/streadway/amqp#Channel.Confirm
場景2: 消費者從queue中獲取消息如何保證不丟失?
消息在到達了rabbitmq之後,會將數據保存到queue裡面,queue是存到內存裡面的,不過rabbitmq提供了持久化的操作,這個策略如下所示:
1.buffer大約1M左右,寫滿之後,就會寫到磁碟中。2.25ms超時時間,buffer不滿的話,超時也會寫到磁碟中。儘管如此,也有可能會丟數據,特別是當rabbitmq在buffer沒有寫到磁碟的時候,就死掉了。不過rabbitmq也提供了鏡像隊列的方式,利用主備的方式來防止消息丟掉,不過當master和salve同時掛掉的話,還是會丟數據,只不過這種同時掛掉的概率會小很多。(筆者覺得,沒有百分之百的不丟消息,只是丟消息的概率變的很低而已。)
參考文章:https://blog.csdn.net/u013256816/article/details/60875666
場景3: rabbitmq內部如何保證不丟失消息?
對於消費者來說,同樣也是採用了消息響應的方式來防止消息不丟失,不過在這一層使用的是ack機制來處理,不過這裡的ack可以設置成不等待ack和等待ack兩種,在這裡我們使用的是設置ack。
消費者對於消息的響應通常有下面三個場景:
1.消費者在接收到rabbitmq的消息之後,等處理完消息之後,會主動回復ack消息,rabbitmq在收到ack之後,便會繼續給這個消費者分配下一個消息進行處理。
2.當然rabbitmq也可以回復unack消息,如此以來消息隊列下一次還會繼續將這個消息分配給消費者,來實現消息重處理。
3.消費者先把ack消息回復掉,然後在重新將這個消息放到rabbitmq之中,如此以來通過rabbitmq的隊列特性來實現,消息的重試,這裡的重試,不是一直處理這一個消息,而是要等到隊列裡面的消息排隊到它才行。
除此之外,rabbitmq還實現了批量的概念,通過qos來設置一次性分配給一個消費者的最大數量,讓消費者一次性消費一批消息,等到處理完了這一批消息,再去分配下一批消息給這一個消費者。
問題1:一旦消費者長時間不回復Ack消息或者消費者卡死了呢,這種場景如何處理?
理論上需要消費者需要實現一個超時處理機制,在一定時間內沒有處理完畢,需要超時回復ack或者unack消息給rabbitmq。
問題2:就算消費者有超時機制,可是一旦消費者在發送ack給rabbitmq的時候,消息丟失,rabbitmq這個消息一直收不到響應消息的話,會怎麼辦呢?
rabbitmq提供了一個可以針對消費者掛掉之後的處理機制,在消費者掛掉之後,會探測出來,然後進行後續處理。
rabbitmq還有一個ttl的功能,可以針對消息隊列或者單個消息設置對應的ttl值,一旦ttl超時,消息就會變為dead message, 不會再分配給消費者。在這裡我們可以採用這個策略,在消息變成死消息之後,我們可以讓生產者再次生產相同的消息放到rabbitmq當中,如果確定這個消息不在使用了,就直接丟棄這個消息。
可以參看:https://blog.csdn.net/u013256816/article/details/54916011
消費者ack相關api:
https://godoc.org/github.com/streadway/amqp#Channel.Ack
備註:如有不妥之處,歡迎大家指正,討論。
因為公眾號沒有評論功能,所以選了一個第三方小程序做評論入口,大家可以試用下,不好用的話,我直接移除,在研究其他評論方式。
留言評論