Springboot2.2.6構建RabbitMQ消息發布端代碼

2021-01-10 萬裡山川

接續之前文章

AMQP協議、模型及RabbitMQ常用組件消息中間件RabbitMQ、微服務,以及數據一致性問題消息中間件RabbitMQ,為什麼使用RabbitMQ以及它支持的場景

大家好,我是技術人小Top

今天咱們來介紹如何使用RabbitMQ構建消息發布端 ^-^

RabbitMQ官網:www.rabbitmq.com

上次介紹了RabbitMQ底層核心原理,即AMQP協議及模型,以及模型所涉及的組件都是什麼

具體到應用開發,需要使用RabbitMQ API來實現具體業務場景

現在開始進入實戰

01消息發布端項目pom文件

依賴jar包

02消息發布端代碼結構

1、yml/properties配置

包含微服務、資料庫連接池、Mybatis、RabbitMQ等自動配置信息

2、config包

包含RabbitMQ的Bean配置信息

定義並聲明Exchange、Queue定義一個非單例的RabbitTemplate

3、control包

包含Controller層的控制類

模擬發起http業務請求完成業務數據、消息數據落地保存完成並確保消息發送

4、service包

包含Service層的業務類

完成業務數據處理完成消息數據處理完成消息發送和異常處理

5、其他包或配置

其他包的配置不涉及RabbitMQ使用,為節省篇幅此處暫且忽略

03關鍵點說明

1、容易忽略的點

感興趣的小夥伴們可自行測試驗證,有任何想法可交流 ^-^

(1)定義RabbitTemplate的生命周期為prototype

此處定義為prototype,原因在於當有多個消息發送時,發布確認API會校驗RabbitTemplate實例是否唯一(即每個RabbitTemplate只支持一個ConfirmCallback)

當RabbitTemplate為單例時會報異常 」Only one ConfirmCallback is supported by each RabbitTemplate「

(2)配置文件中的 」確認消息是否從Exchange發送到Queue「 配置不生效

在yaml或properties文件中配置 spring.rabbitmq.template.mandatory=true不生效

原因在於 」RabbitTemplate的生命周期定義為prototype「,每次調用rabbitTemplate時會重新生成一個新的rabbitTemplate對象

而RabbitAutoConfiguration類中的內部類RabbitTemplateConfiguration的rabbitTemplate方法被定義為只能在單例下執行

因此,新生成的rabbitTemplate對象不會再去加載 spring.rabbitmq.template.* 的配置屬性

2、已經忽略的點

下述情況、規範或場景未周詳考慮,根據需要可逐步完善 ^-^

代碼規範:類注釋、方法注釋、命名規範等日誌記錄:記錄的格式、時機、粒度等前端交互:前端的約定、格式等非消息異常:捕獲、處理等補償重發:消息發送失敗後的補償

04小結

今天主要介紹了如何構建RabbitMQ消息發布端代碼

小夥伴們都了解了嗎?

下次小T將繼續介紹RabbitMQ開發

對於今天的內容有任何疑問或問題,歡迎留言或討論 ^-^

05本文涉及的代碼

=============================

= pom =

=============================

微服務依賴jar包spring-boot-starter-2.2.6.RELEASEspring-boot-starter-web-2.2.6.RELEASERabbitMQ依賴jar包spring-boot-starter-amqp-2.2.6.RELEASEMySQL依賴jar包mysql-connector-java-8.0.19Mybatis依賴jar包mybatis-spring-boot-starter-1.3.2資料庫連接池依賴jar包druid-spring-boot-starter-1.1.21Lombok依賴jar包lombok-1.18.12=============================

= properties =

=============================

#這裡主要關注RabbitMQ的配置信息

spring.rabbitmq.host=rabbitMQ的IP位址

spring.rabbitmq.port=rabbitMQ的埠號(默認5672)

spring.rabbitmq.username=rabbitMQ的用戶名

spring.rabbitmq.password=rabbitMQ的密碼

spring.rabbitmq.virtual-host=需要使用的虛擬機名稱

#是否開啟發布確認(確認消息是否發送到Exchange)

spring.rabbitmq.publisher-confirm-type=correlated

spring.rabbitmq.publisher-confirms=true

#是否開啟發布確認(確認消息是否從Exchange發送到Queue)

spring.rabbitmq.publisher-returns=true

=============================

= java =

=============================

@Configuration

public class RabbitMQConfiguration {

/**

* 消息隊列user1名稱

*/

public static final String MQ_QUEUE_USER1 = "user1";

/**

* 消息隊列user2名稱

*/

public static final String MQ_QUEUE_USER2 = "user2";

/**

* 交換機user名稱

*/

public static final String MQ_FANOUT_EXCHANGE_USER = "user";

/**

* 交換機test名稱

*/

public static final String MQ_FANOUT_EXCHANGE_TEST = "test";

/**

* 定義RabbitTemplate

* @param connectionFactory 連接工廠

* @return

*/

@Bean

//Todo:定義生命周期為prototype

@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)

public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

//Todo:mandatory屬性用於開啟 「確認消息是否從交換機發送到消息隊列」

rabbitTemplate.setMandatory(true);

return rabbitTemplate;

}

/**

* 定義Queue、Exchange

* @return

*/

@Bean

public Declarables declarables() {

//Todo:下面的所有durable()方法用於持久化保存,如果沒有聲明將不能實現持久化

Queue userQueue1 = QueueBuilder.durable(MQ_QUEUE_USER1).build();

Queue userQueue2 = QueueBuilder.durable(MQ_QUEUE_USER2).build();

FanoutExchange fanoutExchange = ExchangeBuilder

.fanoutExchange(MQ_FANOUT_EXCHANGE_USER)

.durable(true)

.build();

DirectExchange directExchange = ExchangeBuilder

.directExchange(MQ_FANOUT_EXCHANGE_TEST)

.durable(true)

.build();

return new Declarables(userQueue1, userQueue2, fanoutExchange, directExchange,

BindingBuilder.bind(userQueue1).to(fanoutExchange),

BindingBuilder.bind(userQueue2).to(fanoutExchange));

}

}

@Slf4j

@RestController

@RequestMapping(value = "/user")

public class MyWebController {

//此處省略

... ...

//Todo:模擬業務請求的方法,並實現數據落地保存和消息發送功能

@RequestMapping(value = "insertUser", method = RequestMethod.POST)

public void insertUser(@RequestBody User user) {

//業務數據保存到資料庫:保存用戶信息

this.userService.insertUser(user);

//生成消息數據:將用戶信息轉換成消息

String jsonStr = this.userMessageService.getMessageByUser(user);

//消息數據保存到資料庫:保存消息

this.userMessageService.insertMessage(user.getId(), jsonStr);

//發送消息到MQ

this.mqService.sendUserMessage(user.getId(), jsonStr);

}

}

@Slf4j

@Service

public class MQSenderImpl implements MQSender {

//消息落地數據處理

@Autowired

private UserMessageMapper userMessageMapper;

//異常消息補償數據處理

@Autowired

private UserMessageCPMapper userMessageCPMapper;

@Lookup

public RabbitTemplate getRabbitTemplate() {

return null;

}

@Override

public void sendUserMessage(Long userId, String jsonStr) {

//Todo:每次動態生成一個新的rabbitTemplate對象

RabbitTemplate rabbitTemplate = this.getRabbitTemplate();

//Todo:確認消息是否發送到Exchange(成功或失敗都會執行此方法)

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

//如果發送成功,記錄日誌

log.info("succeed to send user to RabbitMQ, userID: " + user.getId());

} else {

//如果發送失敗,記錄到消息補償表

UserMessageCP userMessageCP = UserMessageCP.builder()

.userid(user.getId())

.message(jsonStr)

.status(UserMessageStatus.SEND_EXCHANGE_FAILURE.getSend_status())

.errorMsg(cause)

.build();

this.userMessageCPMapper.insert(userMessageCP);

log.error("failed to send user to RabbitMQ, userId: " + user.getId());

}

});

//Todo:確認消息是否從Exchange發送到Queue,如果發送沒有成功會執行此方法(成功則不執行)

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

String errorMsg = String.format("replyCode:%s,replyText:%s,exchange:%s,routingKey:%s", replyCode, replyText, exchange, routingKey);

log.error("未發送到隊列: " + errorMsg);

//Todo:記錄到消息補償表

UserMessageCP userMessageCP = UserMessageCP.builder()

.userid(user.getId())

.message(jsonStr)

.status(UserMessageStatus.SEND_QUEUE_FAILURE.getSend_status())

.errorMsg(errorMsg)

.build();

this.userMessageCPMapper.insert(userMessageCP);

});

//Todo:發送消息到MQ,MessageDurableHelper是自定義的消息持久化輔助類

rabbitTemplate.convertAndSend(RabbitMQConfiguration.MQ_FANOUT_EXCHANGE_TEST, "",

MessageDurableHelper.durable(jsonStr));

}

}

public class MessageDurableHelper {

public static Message durable(String msg) {

if(StringUtils.isEmpty(msg)) {

return null;

}

Message message = MessageBuilder.withBody(msg.getBytes()).build();

//Todo:消息持久化的實現

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

return message;

}

}

相關焦點

  • Springboot2.2.6構建RabbitMQ消息異常處理
    Springboot2.2.6構建RabbitMQ消息接收端代碼Springboot2.2.6構建RabbitMQ消息發布端代碼>大家好,我是技術人小Top今天咱們來介紹如何使用RabbitMQ構建消息異常處理 ^-^
  • 詳解SpringCloud中RabbitMQ消息隊列原理及配置,一篇就夠!
    rabbitmq已經被spring-boot做了整合訪問實現。spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。 --><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
  • Spring Boot 2.4 手工和 SDKMAN! 安裝 Spring Boot 命令行
    可用的下載地址,請參考下面的連結:spring-boot-cli-2.4.2-SNAPSHOT-bin.zipspring-boot-cli-2.4.2-SNAPSHOT-bin.tar.gzSapshot 版本下載,snapshot 版本的意思是從最新的原始碼庫中進行編譯構建的,通常這個版本具有更多的 Bug 修復,下載地址請訪問下面的連結: snapshot 構建版本
  • Spring Boot Admin 2.2.4 發布,兼容最新版本 Spring Boot
    spring boot admin 2.2.4 版本發布,本版本為 bug 修復版本 主要兼容 spring boot 2.3.x。
  • 基於Spring Boot 2.2.6實現Rest風格的文件上傳&下載APIs-附源碼
    基於Spring Boot 2.0實戰系列源碼已經Push到Github倉庫:https://github.com/ramostear/springboot2.0-action 。感興趣的朋友歡迎Star/Fork。1.
  • spring-boot-plus V1.2.3 發布,新增 CentOS 相關腳本
    [V1.2.3-RELEASE] 2019.09.09 💻spring-boot-plusV1.2.3發布,CentOS快速安裝環境/構建/部署/啟動項目⭐️ New Features
  • Spring Boot 2.X 實戰--SQL 資料庫(MyBatis)
    博客主頁:https://me.csdn.net/u010974701原始碼倉庫:https://github.com/zhshuixian/learn-spring-boot-2考慮到 MyBatis 應用比較廣泛,這裡將會使用 MyBatis 作為主要的 Java 持久層框架,對 MyBatis Plus 感興趣的讀者可以對照本小節內容,參考其官網 https
  • 基於Spring Boot+Cloud構建微雲架構
    Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • JeecgBoot 2.4 微服務正式版發布,基於 SpringBoot 的低代碼平臺
    前後端分離架構 SpringBoot2.x,SpringCloud,Ant Design&Vue,Mybatis-plus,Shiro,JWT 支持微服務。強大的代碼生成器讓前後端代碼一鍵生成,實現低代碼開發! JeecgBoot 引領新的低代碼開發模式(OnlineCoding-> 代碼生成-> 手工MERGE), 幫助解決Java項目70%的重複工作,讓開發更多關注業務。
  • SpringBoot+RabbitMQ (保證消息100%投遞成功並被消費)
    三、項目介紹springboot版本2.1.5.RELEASE, 舊版本可能有些配置屬性不能使用, 需要以代碼形式進行配置RabbitConfig: rabbitmq相關配置TestServiceImpl: 生產者, 發送消息MailConsumer: 消費者, 消費消息, 發送郵件ResendMsg: 定時任務, 重新投遞發送失敗的消息說明
  • 如何使用Spring Boot與RabbitMQ結合實現延遲隊列
    初始化工程首先我們在Intellij中創建一個Spring Boot工程,並且添加 spring-boot-starter-amqp 擴展。配置隊列從上述的流程圖中我們可以看到,一個延遲隊列的實現,需要一個緩衝隊列以及一個實際的消費隊列。
  • Spring Boot 2.0 Release Notes 中文版
    在啟動時,使用spring-boot-starter-webflux starter POM,它將提供支持Spring WebFlux的嵌入式Netty Server。有關詳細信息,請參閱Spring Boot參考文檔。
  • Spring Boot 和 Spring 到底有啥區別?
    但是通過使用Spring JDBC模塊的JDBCTemplate,我們可以將操作簡化為幾行代碼。Spring Boot基本上是Spring框架的擴展,它消除了設置Spring應用程式所需的XML配置,為更快,更高效的開發生態系統鋪平了道路。2)嵌入式Tomcat、Jetty、 Undertow容器(無需部署war文件)。6)完全沒有代碼生成和XML配置要求。
  • 一起來學 SpringBoot 2.x | 第十五篇:actuator 與 spring-boot-admin 可以說的秘密
    >        <version>2.0.0</version>    </dependency>        <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spri
  • SwaggerSpringBootStarter 2.1.1 版本更新發布
    SwaggerSpringBootStarter更新到2.1.1版本了,配套依賴spring boot 2.1.1版本。
  • Spring Boot2.2.2整合H2和MySQL自由切換數據源
    </groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!
  • Spring 和 Spring Boot 之間到底有啥區別?
    2、嵌入式 Tomcat、 Jetty、 Undertow容器(無需部署war文件)。3、提供的 starters 簡化構建配置4、儘可能自動配置 spring應用。5、提供生產指標,例如指標、健壯檢查和外部化配置6、完全沒有代碼生成和 XML配置要求從配置分析Maven依賴首先,讓我們看一下使用Spring創建Web應用程式所需的最小依賴項<dependency><groupId>org.springframework</groupId><artifactId
  • 實戰|SpringBoot+RabbitMQ,保證消息100%投遞成功並被消費
    (ack)三、項目介紹springboot版本2.1.5.RELEASE, 舊版本可能有些配置屬性不能使用, 需要以代碼形式進行配置RabbitMQ版本3.7.15MailUtil: 發送郵件工具類RabbitConfig: rabbitmq相關配置TestServiceImpl: 生產者, 發送消息MailConsumer: 消費者, 消費消息, 發送郵件ResendMsg
  • 基於Spring Boot和Spring Cloud實現微服務架構
    Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • Spring Boot 常見錯誤及解決方法
    Jar 包啟動不了執行 Spring Boot 構建的 jar 包後,返回 「my.jar 中沒有主清單屬性」 錯誤。錯誤分析: Spring Boot 的正常 jar 包運行方是通過 spring-boot-loader 這個模塊裡的 JarLauncher 完成的,該類內部提供了一套運行的規範。