接續之前文章
AMQP協議、模型及RabbitMQ常用組件消息中間件RabbitMQ、微服務,以及數據一致性問題消息中間件RabbitMQ,為什麼使用RabbitMQ以及它支持的場景
大家好,我是技術人小Top
今天咱們來介紹如何使用RabbitMQ構建消息發布端 ^-^
上次介紹了RabbitMQ底層核心原理,即AMQP協議及模型,以及模型所涉及的組件都是什麼
具體到應用開發,需要使用RabbitMQ API來實現具體業務場景
現在開始進入實戰
01消息發布端項目pom文件
依賴jar包
02消息發布端代碼結構
1、yml/properties配置
包含微服務、資料庫連接池、Mybatis、RabbitMQ等自動配置信息
2、config包
包含RabbitMQ的Bean配置信息
定義並聲明Exchange、Queue定義一個非單例的RabbitTemplate
3、control包
包含Controller層的控制類
模擬發起http業務請求完成業務數據、消息數據落地保存完成並確保消息發送
4、service包
包含Service層的業務類
完成業務數據處理完成消息數據處理完成消息發送和異常處理
5、其他包或配置
其他包的配置不涉及RabbitMQ使用,為節省篇幅此處暫且忽略
03關鍵點說明
1、容易忽略的點
感興趣的小夥伴們可自行測試驗證,有任何想法可交流 ^-^
(1)定義RabbitTemplate的生命周期為prototype
此處定義為prototype,原因在於當有多個消息發送時,發布確認API會校驗RabbitTemplate實例是否唯一(即每個RabbitTemplate只支持一個ConfirmCallback)
當RabbitTemplate為單例時會報異常 」Only one ConfirmCallback is supported by each RabbitTemplate「
(2)配置文件中的 」確認消息是否從Exchange發送到Queue「 配置不生效
在yaml或properties文件中配置 spring.rabbitmq.template.mandatory=true不生效
原因在於 」RabbitTemplate的生命周期定義為prototype「,每次調用rabbitTemplate時會重新生成一個新的rabbitTemplate對象
而RabbitAutoConfiguration類中的內部類RabbitTemplateConfiguration的rabbitTemplate方法被定義為只能在單例下執行
因此,新生成的rabbitTemplate對象不會再去加載 spring.rabbitmq.template.* 的配置屬性
2、已經忽略的點
下述情況、規範或場景未周詳考慮,根據需要可逐步完善 ^-^
代碼規範:類注釋、方法注釋、命名規範等日誌記錄:記錄的格式、時機、粒度等前端交互:前端的約定、格式等非消息異常:捕獲、處理等補償重發:消息發送失敗後的補償
04小結
今天主要介紹了如何構建RabbitMQ消息發布端代碼
小夥伴們都了解了嗎?
下次小T將繼續介紹RabbitMQ開發
對於今天的內容有任何疑問或問題,歡迎留言或討論 ^-^
05本文涉及的代碼
=============================
= pom =
=============================
微服務依賴jar包spring-boot-starter-2.2.6.RELEASEspring-boot-starter-web-2.2.6.RELEASERabbitMQ依賴jar包spring-boot-starter-amqp-2.2.6.RELEASEMySQL依賴jar包mysql-connector-java-8.0.19Mybatis依賴jar包mybatis-spring-boot-starter-1.3.2資料庫連接池依賴jar包druid-spring-boot-starter-1.1.21Lombok依賴jar包lombok-1.18.12=============================
= properties =
=============================
#這裡主要關注RabbitMQ的配置信息
spring.rabbitmq.host=rabbitMQ的IP位址
spring.rabbitmq.port=rabbitMQ的埠號(默認5672)
spring.rabbitmq.username=rabbitMQ的用戶名
spring.rabbitmq.password=rabbitMQ的密碼
spring.rabbitmq.virtual-host=需要使用的虛擬機名稱
#是否開啟發布確認(確認消息是否發送到Exchange)
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-confirms=true
#是否開啟發布確認(確認消息是否從Exchange發送到Queue)
spring.rabbitmq.publisher-returns=true
=============================
= java =
=============================
@Configuration
public class RabbitMQConfiguration {
/**
* 消息隊列user1名稱
*/
public static final String MQ_QUEUE_USER1 = "user1";
/**
* 消息隊列user2名稱
*/
public static final String MQ_QUEUE_USER2 = "user2";
/**
* 交換機user名稱
*/
public static final String MQ_FANOUT_EXCHANGE_USER = "user";
/**
* 交換機test名稱
*/
public static final String MQ_FANOUT_EXCHANGE_TEST = "test";
/**
* 定義RabbitTemplate
* @param connectionFactory 連接工廠
* @return
*/
@Bean
//Todo:定義生命周期為prototype
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//Todo:mandatory屬性用於開啟 「確認消息是否從交換機發送到消息隊列」
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
/**
* 定義Queue、Exchange
* @return
*/
@Bean
public Declarables declarables() {
//Todo:下面的所有durable()方法用於持久化保存,如果沒有聲明將不能實現持久化
Queue userQueue1 = QueueBuilder.durable(MQ_QUEUE_USER1).build();
Queue userQueue2 = QueueBuilder.durable(MQ_QUEUE_USER2).build();
FanoutExchange fanoutExchange = ExchangeBuilder
.fanoutExchange(MQ_FANOUT_EXCHANGE_USER)
.durable(true)
.build();
DirectExchange directExchange = ExchangeBuilder
.directExchange(MQ_FANOUT_EXCHANGE_TEST)
.durable(true)
.build();
return new Declarables(userQueue1, userQueue2, fanoutExchange, directExchange,
BindingBuilder.bind(userQueue1).to(fanoutExchange),
BindingBuilder.bind(userQueue2).to(fanoutExchange));
}
}
@Slf4j
@RestController
@RequestMapping(value = "/user")
public class MyWebController {
//此處省略
... ...
//Todo:模擬業務請求的方法,並實現數據落地保存和消息發送功能
@RequestMapping(value = "insertUser", method = RequestMethod.POST)
public void insertUser(@RequestBody User user) {
//業務數據保存到資料庫:保存用戶信息
this.userService.insertUser(user);
//生成消息數據:將用戶信息轉換成消息
String jsonStr = this.userMessageService.getMessageByUser(user);
//消息數據保存到資料庫:保存消息
this.userMessageService.insertMessage(user.getId(), jsonStr);
//發送消息到MQ
this.mqService.sendUserMessage(user.getId(), jsonStr);
}
}
@Slf4j
@Service
public class MQSenderImpl implements MQSender {
//消息落地數據處理
@Autowired
private UserMessageMapper userMessageMapper;
//異常消息補償數據處理
@Autowired
private UserMessageCPMapper userMessageCPMapper;
@Lookup
public RabbitTemplate getRabbitTemplate() {
return null;
}
@Override
public void sendUserMessage(Long userId, String jsonStr) {
//Todo:每次動態生成一個新的rabbitTemplate對象
RabbitTemplate rabbitTemplate = this.getRabbitTemplate();
//Todo:確認消息是否發送到Exchange(成功或失敗都會執行此方法)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
//如果發送成功,記錄日誌
log.info("succeed to send user to RabbitMQ, userID: " + user.getId());
} else {
//如果發送失敗,記錄到消息補償表
UserMessageCP userMessageCP = UserMessageCP.builder()
.userid(user.getId())
.message(jsonStr)
.status(UserMessageStatus.SEND_EXCHANGE_FAILURE.getSend_status())
.errorMsg(cause)
.build();
this.userMessageCPMapper.insert(userMessageCP);
log.error("failed to send user to RabbitMQ, userId: " + user.getId());
}
});
//Todo:確認消息是否從Exchange發送到Queue,如果發送沒有成功會執行此方法(成功則不執行)
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String errorMsg = String.format("replyCode:%s,replyText:%s,exchange:%s,routingKey:%s", replyCode, replyText, exchange, routingKey);
log.error("未發送到隊列: " + errorMsg);
//Todo:記錄到消息補償表
UserMessageCP userMessageCP = UserMessageCP.builder()
.userid(user.getId())
.message(jsonStr)
.status(UserMessageStatus.SEND_QUEUE_FAILURE.getSend_status())
.errorMsg(errorMsg)
.build();
this.userMessageCPMapper.insert(userMessageCP);
});
//Todo:發送消息到MQ,MessageDurableHelper是自定義的消息持久化輔助類
rabbitTemplate.convertAndSend(RabbitMQConfiguration.MQ_FANOUT_EXCHANGE_TEST, "",
MessageDurableHelper.durable(jsonStr));
}
}
public class MessageDurableHelper {
public static Message durable(String msg) {
if(StringUtils.isEmpty(msg)) {
return null;
}
Message message = MessageBuilder.withBody(msg.getBytes()).build();
//Todo:消息持久化的實現
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
}