Spring集成RabbitMQ簡單實現RPC

2020-12-15 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException {

return this.convertSendAndReceive(this.exchange, routingKey, message, null);

}

spring整合Rabbit MQ提供了Reply來實現RPC,AMQP協議定義了14中消息的屬性,其中兩項,一項是Replyto,表示返回消息的隊列,一個是correlationId 用來表示發送消息和返回消息的標誌,來區分是否是一個調用

下面一步步來實現RPC

首先貼出spring配置文件代碼

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xmlns:prpc="http://www.pinnettech.com/schema/rpc"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.0.xsd

http://www.springframework.org/schema/tx

http://www.springframework.org/schema/tx/spring-tx-4.0.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-4.0.xsd

http://www.springframework.org/schema/aop

http://www.springframework.org/schema/aop/spring-aop-4.0.xsd

http://www.pinnettech.com/schema/rpc

http://www.pinnettech.com/schema/springtag.xsd

http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<context:component-scan

base-package="com.temp.rabbit">

</context:component-scan>

<!-- rabbit消息發送方 -->

<!-- 連接服務配置 如果MQ伺服器在遠程伺服器上,請新建用戶用新建的用戶名密碼 guest默認不允許遠程登錄-->

<rabbit:connection-factory id="rabbitConnectionFactory" host="localhost" username="dengwei" password="dengwei"

port="5672" virtual-host="/" channel-cache-size="5"/>

<rabbit:admin connection-factory="rabbitConnectionFactory"/>

<!-- 發送消息可以帶*,綁定關係需全單詞 -->

<rabbit:direct-exchange name="rpc.bao.direct.goods" durable="true" auto-delete="false">

<rabbit:bindings>

<rabbit:binding queue="rpc.bao.goods" key="dengwei.goods"/>

</rabbit:bindings>

</rabbit:direct-exchange>

<!-- durable是否持久化 exclusive:是否排外的-->

<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rpc.bao.goods"/>

<!-- 消息轉換器 -->

<bean id="byteMessageConverter"/>

<!-- 發送消息模板 -->

<rabbit:template id="amqTemplate" exchange="rpc.bao.direct.goods"

connection-factory="rabbitConnectionFactory" message-converter="byteMessageConverter" />

<!-- 消息發送方 end -->

<!-- 消息接受方處理器 -->

<bean id="msgHandler"/>

<!-- 消息消費者 -->

<bean id="msgLisenerAdapter">

<constructor-arg name="delegate" ref="msgHandler"/>

<constructor-arg name="messageConverter" ref="byteMessageConverter"/>

</bean>

<!-- 消費者容器 -->

<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto">

<rabbit:listener queues="rpc.bao.goods" ref="msgLisenerAdapter"/>

</rabbit:listener-container>

<!-- 消息接收方 end -->

<!-- RPC 配置 -->

<!-- 消息服務提供接口實現 -->

<bean id="service1"/>

<!-- 代理類 -->

<bean id="service1Proxy">

<property name="t" ref="service1"></property>

</bean>

<!-- 代理對象 -->

<bean id="proxyService" factory-bean="service1Proxy" factory-method="getProxy"/>

</beans>

其中消息轉換器類

package com.temp.rabbit;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;import org.springframework.amqp.support.converter.MessageConversionException;import org.springframework.util.SerializationUtils;public class BytesMessageConverter extends AbstractJsonMessageConverter{@Overrideprotected Message createMessage(Object msg, MessageProperties msgPro) {byte[] data = SerializationUtils.serialize(msg);msgPro.setContentLength(data.length);System.out.println("create message "+msg.getClass());return new Message(data , msgPro);}@Overridepublic Object fromMessage(Message msg) throws MessageConversionException {byte[] data = msg.getBody() ;Object result = SerializationUtils.deserialize(data);System.out.println("create obj "+result.getClass());return result;}}

消費者處理handler

package com.temp.rabbit.receive;import java.lang.reflect.Method;import com.temp.rabbit.bean.RpcRequest;import com.temp.rabbit.bean.RpcResponse;import com.temp.rabbit.bean.TempServiceImp;public class MessageHandler {//沒有設置默認的處理方法的時候,方法名是handleMessagepublic RpcResponse handleMessage(RpcRequest message){Class<?> clazz = message.getClassName() ;RpcResponse response = new RpcResponse();Method method;try {method = clazz.getMethod(message.getMethodName(), message.getParamType());Object result = method.invoke(new TempServiceImp(), message.getParams());response.setResult(result);} catch (Exception e) {e.printStackTrace();}return response ;}}

服務提供:

package com.temp.rabbit.bean;public class TempServiceImp implements TempService {public String sayHello(){return "TempServiceImp hello ... " ;}}

代理類:

package com.temp.rabbit.receive.proxy;import java.io.Serializable;import java.lang.reflect.Method;import org.springframework.beans.factory.annotation.Autowired;import com.temp.rabbit.bean.RpcRequest;import com.temp.rabbit.bean.RpcResponse;import com.temp.rabbit.send.SendRabbitMsgImp;import net.sf.cglib.proxy.Enhancer;import net.sf.cglib.proxy.MethodInterceptor;import net.sf.cglib.proxy.MethodProxy;public class ServiceProxy<T> implements MethodInterceptor{private Enhancer enhancer = new Enhancer();private T t ;public void setT(T t){this.t = t ;}@Autowiredprivate SendRabbitMsgImp rabbitMsg ;public Object getProxy(){enhancer.setSuperclass(t.getClass());enhancer.setCallback(this);return enhancer.create();}@Overridepublic Object intercept(Object obj, Method method, Object[] param, MethodProxy proxy) throws Throwable {RpcRequest request = new RpcRequest();request.setMethodName(method.getName());request.setClassName(t.getClass());Class<?>[] paramType = new Class<?>[param.length];Serializable[] para = new Serializable[param.length];for(int i = 0 ; i < param.length ; i ++){paramType[i] = param[i].getClass();para[i] = (Serializable)param[i];}request.setParams(para);request.setParamType(paramType);RpcResponse result = (RpcResponse)rabbitMsg.sendAdcReceive("dengwei.goods", request) ;return result.getResult();}}

主程序

package com.temp;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import com.temp.rabbit.bean.TempService;public class Main {public static void main(String[] args) {ApplicationContext app = new ClassPathXmlApplicationContext("classpath:spring.xml");TempService proxy = (TempService)app.getBean("proxyService");System.out.println("main result " + proxy.sayHello()) ;}}

消息發送實現:

package com.temp.rabbit.send;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component("sendMsg")public class SendRabbitMsgImp implements SendRabbitMsg{@Autowiredprivate RabbitTemplate template ;@Overridepublic void sendData2Queue(String queueKey, Object msg) {try {template.convertAndSend(queueKey, msg);} catch (Exception e) {e.printStackTrace();System.out.println("send data 2 msg erro ");}System.out.println("消息已發送");}@Overridepublic Object sendAdcReceive(String queueKey , Object msg){try {return template.convertSendAndReceive(queueKey, msg);} catch (Exception e) {e.printStackTrace();System.out.println("send data 2 msg erro ");}System.out.println("消息已發送");return null ;}}

這裡面的RpcRequest和RpcResponse就不貼代碼了

這裡講一下原理實現,我們可以跟著源碼看一下

首先調用的是RabbitTemplate的

public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException {return this.convertSendAndReceive(this.exchange, routingKey, message, null);}

然後一路走下去

@Overridepublic Object convertSendAndReceive(final String exchange, final String routingKey, final Object message,final MessagePostProcessor messagePostProcessor) throws AmqpException {Message requestMessage = convertMessageIfNecessary(message);if (messagePostProcessor != null) {requestMessage = messagePostProcessor.postProcessMessage(requestMessage);}Message replyMessage = this.doSendAndReceive(exchange, routingKey, requestMessage);if (replyMessage == null) {return null;}return this.getRequiredMessageConverter().fromMessage(replyMessage);}protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) {if (this.replyQueue == null) {return doSendAndReceiveWithTemporary(exchange, routingKey, message);}else {return doSendAndReceiveWithFixed(exchange, routingKey, message);}}

到這裡我們會看到,有一個分支如果replyqueue不為空則是走另外的一個方法,因為之前沒有設置replyqueue所以,這裡會

走第一步方法,也就是doSendAndReceiveWithTemporary

來看一下這個方法源碼

protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) {return this.execute(new ChannelCallback<Message>() {@Overridepublic Message doInRabbit(Channel channel) throws Exception {final ArrayBlockingQueue<Message> replyHandoff = new ArrayBlockingQueue<Message>(1);Assert.isNull(message.getMessageProperties().getReplyTo(),"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");DeclareOk queueDeclaration = channel.queueDeclare();String replyTo = queueDeclaration.getQueue();message.getMessageProperties().setReplyTo(replyTo);String consumerTag = UUID.randomUUID().toString();DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties(properties, envelope, encoding);Message reply = new Message(body, messageProperties);if (logger.isTraceEnabled()) {logger.trace("Message received " + reply);}try {replyHandoff.put(reply);}catch (InterruptedException e) {Thread.currentThread().interrupt();}}};channel.basicConsume(replyTo, true, consumerTag, true, true, null, consumer);doSend(channel, exchange, routingKey, message, null);Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout,TimeUnit.MILLISECONDS);channel.basicCancel(consumerTag);return reply;}});}

這裡流程就是申明一個大小為1的臨時隊列,然後發送消息,然後監聽返回的消息,放到臨時隊列,然後取出返回消息。

那麼因為每次都會創建臨時隊列,所以對性能是個考驗那麼有第二種方式,在rabbitmq中申明一個返回隊列,用來存放該服務的返回消息。

那麼需要在spring配置文件中配置一個reply隊列

<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="reply"/>

然後在消息監聽容器中再配置一個發送消息的模板template為消費者

<!-- 消費者容器 --> <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto"> <rabbit:listener queues="reply" ref="amqTemplate"/> <rabbit:listener queues="rpc.bao.goods" ref="msgLisenerAdapter"/> </rabbit:listener-container>

最後再發送消息的實現中即SendRabbitMsgImp類中注入隊列

@Autowired@Qualifier("reply")private Queue reply ;

然後設置template的replyqueue為reply ;template.setReplyQueue(reply);

這個設置代碼可以再初始化方法中,也可以再發送消息之前,其實最好的實在spring中設置

那麼該說原理了,我們可以看最開始發送消息的第二個方法

protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) {

if (this.replyQueue == null) {

return doSendAndReceiveWithTemporary(exchange, routingKey, message);

}

else {

return doSendAndReceiveWithFixed(exchange, routingKey, message);

}

}

protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message) {

return this.execute(new ChannelCallback<Message>() {

@Override

public Message doInRabbit(Channel channel) throws Exception {

final PendingReply pendingReply = new PendingReply();

String messageTag = UUID.randomUUID().toString();

RabbitTemplate.this.replyHolder.put(messageTag, pendingReply);

// Save any existing replyTo and correlation data

String savedReplyTo = message.getMessageProperties().getReplyTo();

pendingReply.setSavedReplyTo(savedReplyTo);

if (StringUtils.hasLength(savedReplyTo) && logger.isDebugEnabled()) {

logger.debug("Replacing replyTo header:" + savedReplyTo

+ " in favor of template's configured reply-queue:"

+ RabbitTemplate.this.replyQueue.getName());

}

message.getMessageProperties().setReplyTo(RabbitTemplate.this.replyQueue.getName());

String savedCorrelation = null;

if (RabbitTemplate.this.correlationKey == null) { // using standard correlationId property

byte[] correlationId = message.getMessageProperties().getCorrelationId();

if (correlationId != null) {

savedCorrelation = new String(correlationId,

RabbitTemplate.this.encoding);

}

}

else {

savedCorrelation = (String) message.getMessageProperties()

.getHeaders().get(RabbitTemplate.this.correlationKey);

}

pendingReply.setSavedCorrelation(savedCorrelation);

if (RabbitTemplate.this.correlationKey == null) { // using standard correlationId property

message.getMessageProperties().setCorrelationId(messageTag

.getBytes(RabbitTemplate.this.encoding));

}

else {

message.getMessageProperties().setHeader(

RabbitTemplate.this.correlationKey, messageTag);

}

if (logger.isDebugEnabled()) {

logger.debug("Sending message with tag " + messageTag);

}

doSend(channel, exchange, routingKey, message, null);

LinkedBlockingQueue<Message> replyHandoff = pendingReply.getQueue();

Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout,

TimeUnit.MILLISECONDS);

RabbitTemplate.this.replyHolder.remove(messageTag);

return reply;

}

});

}

這個方法並沒有申請臨時隊列,發送消息後直接再pendingReply中的隊列中取,那麼怎麼放到pendingReply的隊列中區的呢,可以看到,RabbitTemplate是實現了MessageLIstener,那麼看他實現的onMessage方法

public void onMessage(Message message) {try {String messageTag;if (this.correlationKey == null) { // using standard correlationId propertymessageTag = new String(message.getMessageProperties().getCorrelationId(), this.encoding);}else {messageTag = (String) message.getMessageProperties().getHeaders().get(this.correlationKey);}if (messageTag == null) {logger.error("No correlation header in reply");return;}PendingReply pendingReply = this.replyHolder.get(messageTag);if (pendingReply == null) {if (logger.isWarnEnabled()) {logger.warn("Reply received after timeout for " + messageTag);}}else {// Restore the inbound correlation dataString savedCorrelation = pendingReply.getSavedCorrelation();if (this.correlationKey == null) {if (savedCorrelation == null) {message.getMessageProperties().setCorrelationId(null);}else {message.getMessageProperties().setCorrelationId(savedCorrelation.getBytes(this.encoding));}}else {if (savedCorrelation != null) {message.getMessageProperties().setHeader(this.correlationKey,savedCorrelation);}else {message.getMessageProperties().getHeaders().remove(this.correlationKey);}}// Restore any inbound replyToString savedReplyTo = pendingReply.getSavedReplyTo();message.getMessageProperties().setReplyTo(savedReplyTo);LinkedBlockingQueue<Message> queue = pendingReply.getQueue();queue.add(message);if (logger.isDebugEnabled()) {logger.debug("Reply received for " + messageTag);if (savedReplyTo != null) {logger.debug("Restored replyTo to " + savedReplyTo);}}}}catch (UnsupportedEncodingException e) {throw new AmqpIllegalStateException("Invalid Character Set:" + this.encoding, e);}}

這裡就明白了,根據唯一id也就是前面說的correlationId找到消息的pendingReply,然後將返回的消息放到pendingReply的隊列中,這樣就實現了RPC的調用,

相關焦點

  • 手擼rpc框架,並基於spring進行二次註解開發
    這是我返回給你的結果";該框架包括簡單示例都已上傳至github,連結自取:自定義rpc框架三、自定義rpc框架(一)、代碼架構rpc_apirpc_client:自定義rpc框架客戶端部分,實現了rpc客戶端部分邏輯。
  • 詳解SpringCloud中RabbitMQ消息隊列原理及配置,一篇就夠!
    rabbitmq已經被spring-boot做了整合訪問實現。spring cloud也對springboot做了整合邏輯。所以rabbitmq的依賴可以在spring cloud中直接使用。># 配置rabbitmq連結相關信息。
  • Abp + Grpc 如何實現用戶會話狀態傳遞
    通過這兩個包,你可以很方便地在 Abp 框架當中集成 Grpc 實現服務內部通訊。但是在實際使用當中會出現一個問題,當 A 服務調用 B 服務的時候,A 服務當前登錄用戶為 admin,調用 B 服務的 IAbpSession 的值仍然為空,這個時候當 B 服務內部實現使用了 IAbpSession 時會出現問題。
  • springboot+springsecurity實現前後端分離簡單實現!
    通過各種方式學習springsecurity,在B站、騰訊課堂、網易課堂、慕課網沒有springsecurity的前後端分離的教學視頻,那我就去csdn去尋找springsecurity博客,發現幾個問題:要麼就是前後端不分離,要麼就是通過內存方式讀取數據,而不是通過資料庫的方式讀取數據,要麼就是大佬們給的代碼不全、把代碼講的太繞,關鍵部分沒有注釋
  • Spring全家桶、Dubbo、分布式、消息隊列後端必備全套開源項目
    入門》 對應 labx-28《Spring Cloud 配置中心 Etcd 入門》Spring Cloud Stream《Spring Cloud 消息隊列 RabbitMQ 入門》 對應 labx-10-spring-cloud-stream-rabbitmq
  • 初識Spring Cloud Stream,什麼是消息驅動微服務框架
    Spring Integration是Spring框架對Enterprise Integration Patterns的實現和適配。Spring Integration在基於Spring的應用程式中實現輕量級消息傳遞,並支持通過聲明適配器與外部系統集成。與Spring對遠程處理,消息傳遞和調度的支持相比,這些適配器提供了更高級別的抽象。
  • SpringBoot+GitLab+Docker+Jenkins實現持續集成下
    接下來我們來編寫一個最簡單的SpringBoot入門項目。<?xml version="1.0" encoding="UTF-8"?--所有的springboot工程都必須繼承spring-boot-starter-parent--> <parent> <groupId>org.springframework.boot</groupId>
  • 聊聊我開源RPC框架的那些事
    簡單吐槽一波,給大家聊聊關於 guide-rpc-framework[1] 的一些事情。我踏馬直接踢01 我的自定義 RPC 框架近況關注我的大部分小夥伴應該都知道,3 個月前,我利用業餘時間手寫一個簡單的 RPC 框架(玩具),名字叫做 guide-rpc-framework。
  • go-zero 1.1.2 發布,web 和 rpc 框架
    go-zero 是一個集成了各種工程實踐的 web 和 rpc 框架。通過彈性設計保障了大並發服務端的穩定性,經受了充分的實戰檢驗。go-zero 包含極簡的 API 定義和生成工具 goctl,可以根據定義的 API 文件一鍵生成 Go, iOS, Android, Kotlin, Dart, TypeScript, JavaScript 代碼,並可直接運行。
  • rpc蓋板攪拌機、rpc活性粉末混凝土攪拌機持續創新順應趨勢
    rpc蓋板攪拌機、rpc活性粉末混凝土攪拌機推薦科尼樂機械,由於設計結構非常合理,工作原理更是由於其他類型混凝土攪拌機,該機器立式筒行星運轉裝置,各種類型的混凝土一進入攪拌機中都能快速攪拌勻和。rpc蓋板攪拌機發揮充分的攪拌功能,產能更是和一般攪拌機無差別,攪拌精細度控制範圍大。rpc蓋板攪拌機rpc蓋板攪拌機將設備傳動裝置進行優化,使攪拌機在平穩的設備運轉下,提供高效的攪拌作用;rpc蓋板攪拌機卸料裝置進行偏心式設計,觸點式幹硬開關,減少磨損,提高強制式混凝土攪拌機的耐用性、可靠性。
  • 一文教你使用Jenkins集成Junit自動化測試,超簡單!
    而實現軟體發布自動化的一個重要工具就是Jenkins。Jenkins是一個開源的、提供友好操作界面的持續集成(CI)工具,主要用於持續、自動的構建/測試軟體項目。Jenkins通常與版本管理工具(SCM)、構建工具結合使用。常用的版本控制工具有SVN、GIT,構建工具有Maven、Ant、Gradle。
  • Spring Boot與Shiro整合實現用戶認證
    ;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration
  • springboot2.1.5集成fineReport報表工具
    集成jar首先將fineReport提供集成的jar包,加入本地倉庫中代碼如下:call mvn install:install-file -Dfile=F:\FineReport_9.0\WebReport\WEB-INF\lib\fr-core-9.0.jar -DgroupId
  • spring框架之註解的使用
    Spring的Web集成。本來還計劃學Spring的junit測試集成的,結果又沒時間了。其實無論是spring註解也好,還是配置xml也罷,都是省略new對象這個步驟。將dao層和service層對象關聯起來了,沒有new對象實例化,也能調用dao層代碼。當然spring肯定不止這麼簡單,只不過目前來說還沒有學到其它的知識點。
  • 4年 46 個版本,一文讀懂 Spring Cloud 發展歷史
    spring-cloud-commons 模塊對服務註冊/發現模型進行統一;分布式配置:Configuration Management。spring-core 模塊對配置有 Environment 和 PropertySource 抽象,各個配置中心可以添加各自實現的 PropertySource;服務熔斷:Circuit Breaker。
  • 深入淺出Spring 5,使用Spring 5的反應式WebSocket
    Maven依賴我們將使用開箱即用的spring-boot-starters依賴項來進行spring-boot-integration和spring-boot-starter-webflux(目前可在Spring Milestone Repository中獲得)。
  • Jenkins持續集成
    4、測試人員發現bug,提交bug、開發人員修改bug5、bug修改完畢再次集成、測試。1.1.2 什麼是持續集成持續集成(Continuous integration)簡稱CI,持續集成的思想是每天要多次將代碼合併到主幹,並進行集成、測試,這樣就可以提早發現錯誤,進行修正。
  • 繼「劉強東」之後京東的第二位程式設計師「呂科」spring面試題講解
    Spring框架本身亦是按照設計模式精心打造,這使得我們可以在開發環境中安心的集成Spring框架,不必擔心Spring是如何在後臺進行工作的。Spring框架至今已集成了20多個模塊。這些模塊主要被分如下圖所示的核心容器、數據訪問/集成,、Web、AOP (面向切面編程)、工具、消息和測試模塊。
  • 「純手打」2萬字長文從0開始Spring Boot(上)
    第一次連接資料庫上面的例子讓我們實現了接口返回數據,舉一反三你可以寫出很多複雜的接口,但是,沒有資料庫的支持,都是死數據,沒意思,對吧,廢話不多說,不搞 JDBC 不搞 hibernate ,直接上現代化 mybatis 框架Spring 在之前集成 mybatis 相當複雜,需要配置很多的xml。
  • Java經典面試題Spring是什麼 Spring框架入門詳解
    那麼Spring是什麼呢,Spring遵循分層的結構思想什麼什麼實現了高內聚低耦合巴拉巴拉一大堆,咬文嚼字不是我的強項,直接開幹,讓你們看看Spring到底是什麼東西。那麼spring是否能夠完成我們自定義java對象的注入呢?