用了 springboot + rabbitmq 消息確認機制,我感覺掉坑裡了

2020-09-03 程式設計師內點事

最近部門號召大夥多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這 T M 就是為了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多做技術交流還是很有助於個人成長的。

於是乎我主動報名參加了分享,咳咳咳~ ,真的不是為了那點KPI,就是想和大夥一起學習學習!

這次我分享的是 springboot + rabbitmq 如何實現消息確認機制,以及在實際開發中的一點踩坑經驗,其實整體的內容比較簡單,有時候事情就是這麼神奇,越是簡單的東西就越容易出錯。

可以看到使用了 RabbitMQ 以後,我們的業務鏈路明顯變長了,雖然做到了系統間的解耦,但可能造成消息丟失的場景也增加了。例如:

  • 消息生產者 - > rabbitmq伺服器(消息發送失敗)
  • rabbitmq伺服器自身故障導致消息丟失
  • 消息消費者 - > rabbitmq服務(消費消息失敗)

所以說能不使用中間件就儘量不要用,如果為了用而用只會徒增煩惱。開啟消息確認機制以後,儘管很大程度上保證了消息的準確送達,但由於頻繁的確認交互,rabbitmq 整體效率變低,吞吐量下降嚴重,不是非常重要的消息真心不建議你用消息確認機制。


下邊我們先來實現springboot + rabbitmq消息確認機制,再對遇到的問題做具體分析。

一、準備環境

1、引入 rabbitmq 依賴包

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、修改 application.properties 配置

配置中需要開啟 發送端和 消費端 的消息確認。

spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest 發送者開啟 return 確認機制spring.rabbitmq.publisher-returns=true 是否支持重試spring.rabbitmq.listener.simple.retry.enabled=true

3、定義 exchange 和 Queue

定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,並將隊列綁定在交換機上。

@Configurationpublic class QueueConfig {    @Bean(name = &34;)    public Queue confirmTestQueue() {        return new Queue(&34;, true, false, false);    }    @Bean(name = &34;)    public FanoutExchange confirmTestExchange() {        return new FanoutExchange(&34;);    }    @Bean    public Binding confirmTestFanoutExchangeAndQueue(            @Qualifier(&34;) FanoutExchange confirmTestExchange,            @Qualifier(&34;) Queue confirmTestQueue) {        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);    }}

rabbitmq 的消息確認分為兩部分:發送消息確認 和 消息接收確認。

在這裡插入圖片描述

二、消息發送確認

發送消息確認:用來確認生產者 producer 將消息發送到 broker ,broker 上的交換機 exchange 再投遞給隊列 queue的過程中,消息是否成功投遞。

消息從 producer 到 rabbitmq broker有一個 confirmCallback 確認模式。

消息從 exchange 到 queue 投遞失敗有一個 returnCallback 退回模式。

我們可以利用這兩個Callback來確保消的100%送達。

1、 ConfirmCallback確認模式

消息只要被 rabbitmq broker 接收到就會觸發 confirmCallback 回調 。

@Slf4j@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {        @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if (!ack) {            log.error(&34;);        } else {            log.info(&34;, correlationData.getId(), ack, cause);        }    }}

實現接口 ConfirmCallback ,重寫其confirm()方法,方法內有三個參數correlationData、ack、cause。

  • correlationData:對象內部只有一個 id 屬性,用來表示當前消息的唯一性。
  • ack:消息投遞到broker 的狀態,true表示成功。
  • cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經到達 MQ伺服器,並不能保證消息一定會被投遞到目標 queue 裡。所以接下來需要用到 returnCallback 。

2、 ReturnCallback 退回模式

如果消息未能投遞到目標 queue 裡將觸發回調 returnCallback ,一旦向 queue 投遞消息未成功,這裡一般會記錄下當前消息的詳細投遞數據,方便後續做重發或者補償等操作。

@Slf4j@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        log.info(&34;, replyCode, replyText, exchange, routingKey);    }}

實現接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey(隊列)。

下邊是具體的消息發送,在rabbitTemplate中設置 Confirm 和 Return 回調,我們通過setDeliveryMode()對消息做持久化處理,為了後續測試創建一個 CorrelationData對象,添加一個id 為10000000000。

@Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private ConfirmCallbackService confirmCallbackService;    @Autowired    private ReturnCallbackService returnCallbackService;    public void sendMessage(String exchange, String routingKey, Object msg) {        /**         * 確保消息發送失敗後可以重新返回到隊列中         * 注意:yml需要配置 publisher-returns: true         */        rabbitTemplate.setMandatory(true);        /**         * 消費者確認收到消息後,手動ack回執回調處理         */        rabbitTemplate.setConfirmCallback(confirmCallbackService);        /**         * 消息投遞到隊列失敗回調處理         */        rabbitTemplate.setReturnCallback(returnCallbackService);        /**         * 發送消息         */        rabbitTemplate.convertAndSend(exchange, routingKey, msg,                message -> {                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);                    return message;                },                new CorrelationData(UUID.randomUUID().toString()));    }

三、消息接收確認

消息接收確認要比消息發送確認簡單一點,因為只有一個消息回執(ack)的過程。使用@RabbitHandler註解標註的方法要增加 channel(信道)、message 兩個參數。

@Slf4j@Component@RabbitListener(queues = &34;)public class ReceiverMessage1 {        @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {        try {            log.info(&34;, msg);            //TODO 具體業務                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }  catch (Exception e) {                        if (message.getMessageProperties().getRedelivered()) {                                log.error(&34;);                                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息            } else {                                log.error(&34;);                                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);             }        }    }}

消費消息有三種回執方法,我們來分析一下每種方法的含義。

1、basicAck

basicAck:表示成功確認,使用此回執方法後,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞後,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行ack、nack、reject等操作。

multiple:是否批量確認,值為 true 則會一次性 ack所有小於當前消息 deliveryTag 的消息。

舉個慄子 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。

2、basicNack

basicNack :表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投遞序號。

multiple:是否批量確認。

requeue:值為 true 消息將重新入隊列。

3、basicReject

basicReject:拒絕消息,與basicNack區別在於不能進行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投遞序號。

requeue:值為 true 消息將重新入隊列。

四、測試

發送消息測試一下消息確認機制是否生效,從執行結果上看發送者發消息後成功回調,消費端成功的消費了消息。

用抓包工具Wireshark 觀察一下rabbitmq amqp協議交互的變化,也多了 ack 的過程。

五、踩坑日誌

1、不消息確認

這是一個非常沒技術含量的坑,但卻是非常容易犯錯的地方。

開啟消息確認機制,消費消息別忘了channel.basicAck,否則消息會一直存在,導致重複消費。

2、消息無限投遞

在我最開始接觸消息確認機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯後確認消息, int a = 1 / 0 發生異常後將消息重新投入隊列。

@RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {        try {            log.info(&34;, msg);            int a = 1 / 0;            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);        }    }

但是有個問題是,業務代碼一旦出現 bug 99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,導致了死循環。

在這裡插入圖片描述

本地的CPU被瞬間打滿了,大家可以想像一下當時在生產環境導致服務死機,我是有多慌。

而且rabbitmq management 只有一條未被確認的消息。

在這裡插入圖片描述

經過測試分析發現,當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

消費者會立刻消費這條消息,業務處理再拋出異常,消息再重新入隊,如此反覆進行。導致消息隊列處理出現阻塞,導致正常消息也無法運行。

而我們當時的解決方案是,先將消息進行應答,此時消息隊列會刪除該條消息,同時我們再次發送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新發送消息到隊尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,                    JSON.toJSONBytes(msg));

但這種方法並沒有解決根本問題,錯誤消息還是會時不時報錯,後面優化設置了消息重試次數,達到了重試上限以後,手動確認,隊列刪除此消息,並將消息持久化入MySQL並推送報警,進行人工處理和定時任務做補償。

3、重複消費

如何保證 MQ 的消費是冪等性,這個需要根據具體業務而定,可以藉助MySQL、或者redis 將消息持久化,通過再消息中的唯一性屬性校驗。

demo的 GitHub 地址 https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm


原創不易,燃燒秀髮輸出內容,如果有一丟丟收穫,點個讚鼓勵一下吧!

整理了幾百本各類技術電子書相送 ,噓~,「免費」 送給小夥伴們,私信或者評論【666】自行領取。和一些小夥伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步。

相關焦點

  • springboot優雅的整合rabbitmq,讓削峰進行到底
    &34;這裡是訂閱隊列1,已經接收到消息,參數:{}&34;這裡是訂閱隊列2,已經接收到消息,參數:{}&34;testQueuetoTopic1&34;.34;,這裡的意思就是只要發送topic開頭的隊列,都會、被此路由分發,需要注意一點,這裡的「#」可以用「*」代替。三種廣播模式的消息介紹完成之後我在介紹一種消息模式:消息的確認機制。
  • 實戰|SpringBoot+RabbitMQ,保證消息100%投遞成功並被消費
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫來自:簡書,作者:wangzaiplus連結:https://www.jianshu.com/p/dca01aad6bc8一、先扔一張圖說明:本文涵蓋了關於RabbitMQ很多方面的知識點, 如:消息發送確認機制消費確認機制消息的重新投遞消費冪等性
  • springboot項目整合rabbitmq學習第一步
    springboot項目整合rabbitmq的也是很簡單的。1、前提安裝好rabbitmq。2、pom.xml添加rabbitmq依賴。這個spring-boot-starter-amqp裡面的amqp指的是高級消息隊列協議,而rabbitmq就是amqp協議的一種實現中間件。
  • 【SpringBoot MQ 系列】RabbitListener 消費基本使用姿勢介紹
    配置首先創建一個 SpringBoot 項目,用於後續的演示springboot 版本為2.2.1.RELEASErabbitmq 版本為 3.7.5 (安裝教程可參考: 【MQ 系列】springboot + rabbitmq 初體驗)依賴配置文件
  • springBoot 整合 rabbitMQ,實現事務補償
    rabbitMQ 在網際網路公司有著大規模應用,本篇將實戰介紹 springboot 整合 rabbitMQ,同時也將在具體的業務場景中介紹利用服務信息spring.rabbitmq.addresses=197.168.24.206:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
  • 《RabbitMQ》如何保證消息的可靠性
    一條消費成功被消費經歷了生產者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。一 消息生產者沒有把消息成功發送到MQ1.1 事務機制AMQP協議提供了事務機制,在投遞消息時開啟事務支持,如果消息投遞失敗,則回滾事務。
  • Springboot整合RabbitMQ
    1.8.創建即將安裝的目錄1.9.配置安裝路徑1.2.0.安裝1.2.1.查看一下是否安裝成功1.2.2.添加環境變量1.2.3.刷新環境變量1.2.4.輸入命令驗證安裝是否成功1.2.5.輸入halt().命令退出(點號不要忘記)安裝RabbitMQ2.1.安裝rabbitmq
  • RabbitMQ從入門到精通
    該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,集群內部的網絡帶寬將會被這種同步通訊大大消耗掉。所以在對可靠性要求較高的場合中適用。由於鏡像隊列之間消息自動同步,且內部有選舉master機制,即使master節點宕機也不會影響整個集群的使用,達到去中心化的目的,從而有效的防止消息丟失及服務不可用等問題。
  • RabbitMQ 專題(1) #削峰限流
    ,如果一定數目的消息未被確認,不進行消費新的消息。,// prefetchCount=3:表示預讀取消息數量為3,如果這三條消息均沒有確認,則消費者不再讀取新消息// global=false:表示prefetchCount單獨應用於信道上的每個新消費者channel.basicQos(0,3,false);//不自動回復隊列應答 -- RabbitMQ 中的消息確認機制,// 限流方式// queue
  • SPRINGBOOT + RABBITMQ發送郵件100%
    image.png說明:本文涵蓋了關於RabbitMQ很多方面的知識點, 如:消息發送確認機制消費確認機制消息的重新投遞消費冪等性, 等等這些都是圍繞上面那張整體流程圖展開的, 所以有必要先貼出來, 見圖知意二、實現思路
  • 帶你熟悉 SpringBoot RabbitMQ 收發消息
    比如我的代碼這裡就是設置了一下消息的類型,消息的類型有很多種可以是二進位類型,文本類型,或者序列化類型,JSON類型,我這裡設置的就是文本類型,指定類型是必須的,也可以為我們拿到消息之後要將消息轉換成什麼樣的對象提供一個參考。
  • 阿里巴巴最新消息中間件架構體系—RabbitMQ研究(萬字總結)
    Basic.Consume和Basic.Consume-OK這兩個命令主要用來進行消息消費確認。(1)可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
  • 一篇文章帶你使用 SpringBoot 整合RabbitMQ
    ,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ 底層是用了 Erlang 語言來編寫的,並且 RabbitMQ 是基於 AMQP 協議的. docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management1
  • 膩害,高人都是這樣玩SpringBoot整合RabbitMQ
    ,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ 底層是用了 Erlang 語言來編寫的,並且 RabbitMQ 是基於 AMQP 協議的. docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management1
  • RabbitMQ 的監控
    而且我現在就遇到了這樣的情況,主要是隊列積壓的問題。由於量不是很大,所以磁碟空間倒不是很擔心,但有時程序執行會報錯,導致隊列一直消費不下去,這就很讓人尷尬了。查了一些資料,總結了一下。想要了解 RabbitMQ 的運行狀態,主要有三種途徑:Management UI,rabbitmqctl 命令和 REST API。
  • springboot消息機制掃盲
    、Message(只有消息頭和屬性)bytes[] 當實際應用,有複雜的消息,可以將消息序列化後發送綜合評價在java體系中,client均可JMS進行交互定義了wire-level層的協議標準spring的支持spring-jms提供了對JMS的支持spring-rabbit提供了對
  • 深入剖析 rabbitMQ
    /7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm下載rabbitMQwget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
  • RabbitMQ跨機房遷移數據零丟失
    payload);其中為basicProperties為消息屬性,類型為AMQP.BasicPropertiespublic static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties
  • RabbitMQ 集群高可用原理及實戰部署介紹
    rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm安裝完成之後,修改rabbitmq的配置,默認配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。
  • RabbitMQ 集群入門
    scp /var/lib/rabbitmq/.erlang.cookie 192.168.186.129:/var/lib/rabbitmq/修改 cookie 文件,要重啟 linux@A ~]# rabbitmqctl stop_appStopping rabbit application on node rabbit@A ...