一、rpc是什麼
RPC是遠程過程調用(Remote Procedure Call)的縮寫形式。
客戶端通過網絡傳輸,遠程調用服務端的函數,服務端處理客戶端的調用請求,然後將結果通過網絡傳輸返回客戶端。
二、自定義rpc框架的使用展示
(一)、客戶端代碼
只需將遠程的接口(來自rpc_api包,只有接口)
IHelloService
添加為屬性並使用
@Autowired
即可像使用普通本地對象一樣的調用遠程函數
@Component
public class ServiceTest {
IHelloService helloService;
public String hello(String content) {
System.out.println("===========client:開始調用遠程方法===============");
String res = helloService.sayHello(content);
System.out.println("===========client:開始調用遠程結束===============");
return res;
}
(二)、服務端代碼
只需使用
@RpcServiceImpl(value = IHelloService.class, version = "v1.0")
註解,繼承rpc_api包中的接口進行實現,並啟動容器即可
public class V1HelloServiceImpl implements IHelloService {
@Override
public String sayHello(String content) {
System.out.println("===============server:開始執行遠程方法sayHello【v1.0】===============");
System.out.println(content);
System.out.println("===============server:遠程方法sayHello【v1.0】執行完畢===============");
return "恭喜你,rpc通信完成了!這是我返回給你的結果";
該框架包括簡單示例都已上傳至github,連結自取:自定義rpc框架
三、自定義rpc框架
(一)、代碼架構
rpc_api:遠程服務端提供的方法接口。rpc_client:自定義rpc框架客戶端部分,實現了rpc客戶端部分邏輯。rpc_server:自定義rpc框架服務端部分,實現了rpc服務端部分邏輯。clienct_app:客戶端應用,依賴,和。可以使用下接口的方法(像上面示例一樣,直接使用spring的註解即可),實質是遠程調用實現的方法。:服務端應用,依賴和。實現包的接口,供客戶端遠程調用(像上面示例一樣,只要使用@RpcServiceImpl註解即可)。
(二)、
rpc_api代碼
很簡單,定義服務端提供的api即可,使用自定義註解
@RpcServiceAPI
package com.zte;
import com.zte.annotation.RpcServiceAPI;
public interface IHelloService {
String sayHello(String content);
(三)、
代碼
annotation:api包註冊API註解client:管理socket,並處理遠程調用的邏輯spring_rpc:和mybatis_spring一樣,將我們自定義的rpc框架整合到spirng中,使其使用時直接使用@AutoWired即可RpcRequest:通信時序列化傳輸的對象
1. client
(1)、RpcHandler
當客戶端使用api包下的接口方法時,使用動態代理技術,實際上調用的是我們在
InvocationHandler
中定義的
invoke
方法,而該方法會通過socket通信,將調用信息序列化傳輸至服務端,服務端接收到請求後,處理請求,調用其具體實現類,然後返回結果。動態代理是spring框架的一大基石,掌握理解對開發與面試非常重要,可以參考兩萬字吐血總結,代理模式及手寫實現動態代理(aop原理,基於jdk動態代理)
/**
* api接口的實際處理者
*/
public class RpcHandler implements InvocationHandler {
private String host;
private int port;
private String version;
public RpcHandler(String host, int port, String version) {
this.host = host;
this.port = port;
this.version = version;
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setArgs(args);
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setVersion(version);
return RpcClient.send(rpcRequest, host, port);
(2)、RpcClient
該類建立socket通信發送遠程調用信息至服務端,並接受遠程返回結果
客戶端發送請求
class RpcClient {
static Object send(RpcRequest rpcRequest, String host, int port) {
Object res = null;
//try with ,實現了Closeable的都可以在執行玩方法體後自動關閉,會處理異常
try (
Socket socket = new Socket(host, port);
final ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
final ObjectInputStream in = new ObjectInputStream(socket.getInputStream())
)
out.writeObject(rpcRequest);
out.flush();
res = in.readObject();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
2.@RpcServiceAPI
標註api的接口,在構造SpringBean時,如果bean中有
標註的接口的成員變量,會使用動態代理注入代理類,負責遠程調用服務端方法。(如同mybatis中使用
@AutoWired UserDao dao
一樣,雖然我們只定義了接口,但是依然可以使用它的方法,原因就是因為,spring將mybatis生成的動態代理類注入了。)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcServiceAPI {
3.spring_rpc
將自定義的rpc框架整合到spring中。可以參考 面試官你好,我自己手寫實現過Mybatis(超詳細注釋)
(1)、RpcClientProxyFactory
* 提供api接口的代理類,實現spring的FactoryBean接口
//該註解可以從配置文件讀取value
@PropertySource("classpath:rpc.properties")
public class RpcClientProxyFactory<T> implements FactoryBean<T> {
@Value("${rpc.host:127.0.0.1}")
@Value("${rpc.port:6666}")
@Value("${rpc.version:v1.0}")
private Class<T> interfaceCls;
public RpcClientProxyFactory(Class<T> interfaceCls) {
this.interfaceCls = interfaceCls;
public Class<T> getInterfaceCls() {
return interfaceCls;
public void setInterfaceCls(Class<T> interfaceCls) {
public T getObject() throws Exception {
//將代理類對象注入容器
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
new Class[]{interfaceCls}, new RpcHandler(host, port, version));
public boolean isSingleton() {
return true;
public Class<T> getObjectType() {
(2)、ServiceBeanDefinitionRegistry
* 用於Spring動態注入rpc接口,類似於spring-mybatis整合,當被@RpcProxyFactory註解的interface使用@Autowired時會注入其代理類
public class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {
private static Set<Class<?>> classCache = new HashSet<>();
private final static String RESOURCE_PATTERN = "/**/*.class";
private static final String BASE_PACKAGE = "com.zte";
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
getAPIClz();
classCache.forEach(
beanClazz -> {
//重新定義被RpcServiceAPI註解標註類的bean生成方法,這裡將代理對象注入容器
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz);
GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
//在這裡,我們可以給該對象的屬性注入對應的實例。
//比如mybatis,就在這裡注入了dataSource和sqlSessionFactory,
// 注意,如果採用definition.getPropertyValues()方式的話,
// 類似definition.getPropertyValues().add("interfaceType", beanClazz);
// 則要求在FactoryBean(本應用中即ServiceFactory)提供setter方法,否則會注入失敗
// 如果採用definition.getConstructorArgumentValues(),
// 則FactoryBean中需要提供包含該屬性的構造方法,否則會注入失敗
//簡單來說,就是將beanClazz注入到RpcClientProxyFactory的構造方法中
definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz);
// 其返回的是該工廠Bean的getObject方法所返回的對象。
definition.setBeanClass(RpcClientProxyFactory.class);
//這裡採用的是byType方式注入,類似的還有byName等
definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE);
registry.registerBeanDefinition(beanClazz.getSimpleName(), definition);
);
*
public void getAPIClz() {
* 掃描使用註解RpcServiceAPI的類
ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
try {
String pattern = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + ClassUtils.convertClassNameToResourcePath(BASE_PACKAGE)
+ RESOURCE_PATTERN;
Resource[] resources = resourcePatternResolver.getResources(pattern);
MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourcePatternResolver);
for (Resource resource : resources) {
if (resource.isReadable()) {
MetadataReader reader = readerFactory.getMetadataReader(resource);
//掃描到的class
String className = reader.getClassMetadata().getClassName();
Class<?> clazz = Class.forName(className);
//判斷是否有指定註解
RpcServiceAPI annotation = clazz.getAnnotation(RpcServiceAPI.class);
if (annotation != null) {
//這個類使用了自定義註解
classCache.add(clazz);
System.out.println(e.getMessage());
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
4.RpcRequest
網絡通信的消息體,包括了所有的調用信息
public class RpcRequest implements Serializable {
private static final long serialVersionUID = -5854150492574586489L;
private String className;
private Object[] args;
private String methodName;
public String getVersion() {
return version;
public void setVersion(String version) {
public String getClassName() {
return className;
public void setClassName(String className) {
this.className = className;
public Object[] getArgs() {
return args;
public void setArgs(Object[] args) {
this.args = args;
public String getMethodName() {
return methodName;
public void setMethodName(String methodName) {
this.methodName = methodName;
(四)、rpc_server代碼
RpcServiceImpl註解:服務端應用使用該註解api接口包的實現類server:處理客戶端應用序列化發送而來的調用請求,使用反射,調用對應的實現類方法,並返回調用結果RpcRequest:通信的消息,包括了全類名、方法名,參數列表
1. RpcServiceImpl
服務端應用使用該註解標註api的實現類
* 標註服務實現類,並將實現類添加到spring容器
public @interface RpcServiceImpl {
Class<?> value();
String version();
2.server
(1)、ServerProcessorHandler
處理客戶端的請求,解析客戶端傳遞的消息,調用實現類方法,返回執行結果
* 處理客戶端請求
public class ServerProcessorHandler implements Runnable {
private Socket socket;
private Map<String, Object> handlerMap;
public ServerProcessorHandler(Socket socket, Map<String, Object> handlerMap) {
this.socket = socket;
this.handlerMap = handlerMap;
public void run() {
final RpcRequest request = (RpcRequest) in.readObject();
out.writeObject(invoke(request));
} catch (Exception e) {
private Object invoke(RpcRequest request) throws Exception {
Object res;
String serviceName = request.getClassName();
String version = request.getVersion();
//增加版本號的判斷
if (!StringUtils.isEmpty(version)) {
serviceName += "-" + version;
//得到實現類bean對象
Object service = handlerMap.get(serviceName);
if (service == null) {
throw new RuntimeException("service not found:" + serviceName);
//拿到客戶端請求的參數
Object[] args = request.getArgs();
Method method = null;
Class clazz = Class.forName(request.getClassName());
//無參和有參方法
if (args != null) {
//獲得每個參數的類型
Class<?>[] types = new Class[args.length];
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
method = clazz.getMethod(request.getMethodName(), types);
res = method.invoke(service, request.getArgs());
} else {
method = clazz.getMethod(request.getMethodName());
res = method.invoke(service);
(2)、RpcServer
開啟SocketServer,接受客戶端請求,並將接口對應的實現類對象注入到容器(使用spring提供的切入點),對spring生命周期不熟悉的可以查看圖解Spring中bean的生命周期
* 開啟SocketServer,接受客戶端請求,並將接口對應的實現類對象注入到容器,使用spring提供的鉤子函數,在spring創建bean時會調用afterPropertiesSet和setApplicationContext方法
public class RpcServer implements ApplicationContextAware, InitializingBean {
//推薦自定參數創建
ExecutorService executorService = Executors.newCachedThreadPool();
//k為全類名+版本號,v是對應的實現類。添加版本號可以適配不同客戶端,做灰度發布
private Map<String, Object> handlerMap = new HashMap();
public RpcServer() {
public RpcServer(ExecutorService executorService, Map<String, Object> handlerMap) {
this.executorService = executorService;
public RpcServer(int port) {
public void afterPropertiesSet() throws Exception {
ServerSocket serverSocket = null;
serverSocket = new ServerSocket(port);
System.out.println("server端啟動了");
while (true) {//不斷接受請求
Socket socket = serverSocket.accept();//BIO
//每一個socket 交給一個processorHandler來處理
executorService.execute(new ServerProcessorHandler(socket, handlerMap));
} catch (IOException e) {
} finally {
if (serverSocket != null) {
serverSocket.close();
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcServiceImpl.class);
if (!serviceBeanMap.isEmpty()) {
for (Object servcieBean : serviceBeanMap.values()) {
//拿到註解
RpcServiceImpl rpcService = servcieBean.getClass().getAnnotation((RpcServiceImpl.class));
String serviceName = rpcService.value().getName();//拿到接口類定義
String version = rpcService.version(); //拿到版本號
handlerMap.put(serviceName, servcieBean);
(五)、使用demo
1、客戶端
(1)、客戶端啟動類
@Configuration
@ComponentScan(basePackages = "com.zte")
public class ClientApp {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ClientApp.class);
context.start();
final ServiceTest helloService = context.getBean(ServiceTest.class);
System.out.println(helloService.hello("rpc我也會啦!!"));
(2)、使用遠程接口
//客戶端只需使用@AutoWried註解即可,就可以像使用本地對象一樣調用遠程的api
2、服務端
(1)、 提供api實現類
使用時,只需實現api包中的接口,並使用RpcServiceImpl註解即可
(2)、 服務端啟動類
public class ServerApp {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ServerApp.class);
@Bean
public RpcServer getRpcServer() {
return new RpcServer();
3、配置文件
rpc.host=172.0.0.1
rpc.port=6666
# 服務端沒有該屬性
rpc.version=v1.0
四、運行結果
(一)、客戶端
(二)、服務端