Flink組件通訊過程
RPC(本地/遠程)調用,底層是通過Akka提供的tell/ask方法進行通信。
Flink中RPC框架中涉及的主要類:
Flink的RPC協議通過RpcGateway來定義,主要定義通信行為;用於遠程調用RpcEndpoint的某些方法,可以理解為客服端代理。
若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通信時,必須讓其提供對應地址。
從類繼承圖可以看到基本上所有組件都實現了RpcGateway接口,其代碼如下:
public interface RpcGateway {String getAddress();String getHostname();}RpcEndpoint是通信終端,提供RPC服務組件的生命周期管理(start、stop)。每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,其構造函數如下:
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService =checkNotNull(rpcService, "rpcService");this.endpointId =checkNotNull(endpointId, "endpointId");this.rpcServer =rpcService.startServer(this);this.mainThreadExecutor= new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}構造的時候調用rpcService.startServer()啟動RpcServer,進入可以接收處理請求的狀態,最後將RpcServer綁定到主線程上真正執行起來。
在RpcEndpoint中還定義了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法來執行Rpc調用,值得注意的是在Flink的設計中,對於同一個Endpoint,所有的調用都運行在主線程,因此不會有並發問題,當啟動RpcEndpoint/進行Rpc調用時,其會委託RcpServer進行處理。
Akka有兩種核心的異步通信方式:tell和ask。
RpcService 和 RpcServer是RpcEndPoint的成員變量。
1)RpcService 是Rpc服務的接口,其主要作用如下:
根據提供的RpcEndpoint來啟動和停止RpcServer(Actor);
根據提供的地址連接到RpcServer,並返回一個RpcGateway;
延遲/立刻調度Runnable、Callable;
在Flink中實現類為AkkaRpcService,是 Akka 的 ActorSystem 的封裝,基本可以理解成 ActorSystem 的一個適配器。在ClusterEntrypoint(JobMaster)和TaskManagerRunner(TaskExecutor)啟動的過程中初始化並啟動。
AkkaRpcService中封裝了ActorSystem,並保存了ActorRef到RpcEndpoint的映射關係。RpcService跟RpcGateway類似,也提供了獲取地址和埠的方法。
在構造RpcEndpoint時會啟動指定rpcEndpoint上的RpcServer,其會根據RpcEndpoint類型(FencedRpcEndpoint或其他)來創建不同的AkkaRpcActor(FencedAkkaRpcActor或AkkaRpcActor),並將RpcEndpoint和AkkaRpcActor對應的ActorRef保存起來,AkkaRpcActor是底層Akka調用的實際接收者,RPC的請求在客戶端被封裝成RpcInvocation對象,以Akka消息的形式發送。
最終使用動態代理將所有的消息轉發到InvocationHandler,具體代碼如下:
public <Cextends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {... ...@SuppressWarnings("unchecked") RpcServerserver = (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray(newClass<?>[implementedRpcGateways.size()]), akkaInvocationHandler);return server;}2)RpcServer負責接收響應遠端RPC消息請求。有兩個實現:
RpcServer的啟動是通知底層的AkkaRpcActor切換為START狀態,開始處理遠程調用請求:
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,RpcServer { @Override public void start() { rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender()); }}AkkaRpcActor是Akka的具體實現,主要負責處理如下類型消息:
1)本地Rpc調用LocalRpcInvocation
會指派給RpcEndpoint進行處理,如果有響應結果,則將響應結果返還給Sender。
2)RunAsync & CallAsync
這類消息帶有可執行的代碼,直接在Actor的線程中執行。
3)控制消息ControlMessages
用來控制Actor行為,START啟動,STOP停止,停止後收到的消息會丟棄掉。
RPC通信過程分為請求和響應。
1、 RPC請求發送
在RpcService中調用connect()方法與對端的RpcEndpoint建立連接,connect()方法根據給的地址返回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是RpcServer的實現)。
前面分析到客戶端提供代理對象RpcServer,代理對象會調用AkkaInvocationHandler的invoke方法並傳入RPC調用的方法和參數信息,代碼如下:
AkkaInvocationHandler.java
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass =method.getDeclaringClass(); Object result;// 判斷方法所屬的classif(declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class)|| declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if(declaringClass.equals(FencedRpcGateway.class)) {throw new UnsupportedOperationException("AkkaInvocationHandler does not support thecall FencedRpcGateway#" + method.getName() + ". This indicates thatyou retrieved a FencedRpcGateway without specifying a " +"fencingtoken. Please use RpcService#connect(RpcService, F, Time) with F being thefencing token to " +"retrieve aproperly FencedRpcGateway."); } else { // rpc調用 result = invokeRpc(method, args); }return result;}代碼中判斷所屬的類,如果是RPC方法,則調用invokeRpc方法。將方法調用封裝為RPCInvocation消息。如果是本地則生成LocalRPCInvocation,本地消息不需要序列化,如果是遠程調用則創建RemoteRPCInvocation。
判斷遠程方法調用是否需要等待結果,如果無需等待(void),則使用向Actor發送tell類型的消息,如果需要返回結果,則向Acrot發送ask類型的消息,代碼如下:
AkkaInvocationHandler.java
private Object invokeRpc(Method method, Object[]args) throws Exception { String methodName = method.getName(); Class<?>[] parameterTypes =method.getParameterTypes(); Annotation[][] parameterAnnotations =method.getParameterAnnotations(); Time futureTimeout =extractRpcTimeout(parameterAnnotations, args, timeout); final RpcInvocation rpcInvocation =createRpcInvocationMessage(methodName, parameterTypes, args); Class<?> returnType =method.getReturnType(); final Object result; if (Objects.equals(returnType, Void.TYPE)) { tell(rpcInvocation); result = null; } else { final ThrowablecallStackCapture = captureAskCallStack ? new Throwable() : null; final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout); final CompletableFuture<Object> completableFuture = newCompletableFuture<>(); resultFuture.whenComplete((resultValue,failure) -> { if (failure != null) { completableFuture.completeExceptionally(resolveTimeoutException(failure,callStackCapture, method)); } else { completableFuture.complete(deserializeValueIfNeeded(resultValue,method)); } }); if (Objects.equals(returnType,CompletableFuture.class)) { result =completableFuture; } else { try { result = completableFuture.get(futureTimeout.getSize(),futureTimeout.getUnit()); } catch(ExecutionException ee) { throw new RpcException("Failure while obtaining synchronous RPC result.",ExceptionUtils.stripExecutionException(ee)); } } } return result;}2、 RPC請求響應
RPC消息通過RpcEndpoint所綁定的Actor的ActorRef發送的,AkkaRpcActor是消息接收的入口,AkkaRpcActor在RpcEndpoint中構造生成,負責將消息交給不同的方法進行處理。
AkkaRpcActor.java
public Receive createReceive() { return ReceiveBuilder.create() .match(RemoteHandshakeMessage.class,this::handleHandshakeMessage) .match(ControlMessages.class,this::handleControlMessage) .matchAny(this::handleMessage) .build();}接收的消息有3種:
1)握手消息
在客戶端構造時會通過ActorSelection發送過來。收到消息後檢查接口、版本是否匹配。
AkkaRpcActor.java
private void handleHandshakeMessage(RemoteHandshakeMessagehandshakeMessage) { if(!isCompatibleVersion(handshakeMessage.getVersion())) { sendErrorIfSender(newAkkaHandshakeException( String.format( "Versionmismatch between source (%s) and target (%s) rpc component. Please verify thatall components have the same version.", handshakeMessage.getVersion(), getVersion()))); } else if(!isGatewaySupported(handshakeMessage.getRpcGateway())) { sendErrorIfSender(newAkkaHandshakeException( String.format( "The rpcendpoint does not support the gateway %s.", handshakeMessage.getRpcGateway().getSimpleName()))); } else { getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE),getSelf()); }}2)控制消息
在RpcEndpoint調用start方法後,會向自身發送一條Processing.START消息來轉換當前Actor的狀態為STARTED,STOP也類似,並且只有在Actor狀態為STARTED時才會處理RPC請求。
AkkaRpcActor.java
private void handleControlMessage(ControlMessages controlMessage) { try { switch (controlMessage) { case START: state =state.start(this); break; case STOP: state =state.stop(); break; case TERMINATE: state =state.terminate(this); break; default: handleUnknownControlMessage(controlMessage); } } catch (Exception e) { this.rpcEndpointTerminationResult= RpcEndpointTerminationResult.failure(e); throw e; }}3)RPC消息
通過解析RpcInvocation獲取方法名和參數類型,並從RpcEndpoint類中找到Method對象,通過反射調用該方法。如果有返回結果,會以Akka消息的形式發送回發送者。
AkkaRpcActor.java
private void handleMessage(final Object message) { if (state.isRunning()) { mainThreadValidator.enterMainThread(); try { handleRpcMessage(message); } finally { mainThreadValidator.exitMainThread(); } } else { log.info("The rpcendpoint {} has not been started yet. Discarding message {} until processing isstarted.", rpcEndpoint.getClass().getName(), message.getClass().getName()); sendErrorIfSender(newAkkaRpcException( String.format("Discardmessage, because the rpc endpoint %s has not been started yet.",rpcEndpoint.getAddress()))); }}「還不過癮?一頭霧水?關注公眾號,回復Flink,即能獲取全部詳細視頻講解。」
微信號|bigdata_story
B站|大數據那些事
想獲取更多更全資料
掃碼加好友入群
歡迎各位大佬加入開源共享
共同面對大數據領域疑難問題
來稿請投郵箱:miaochuanhai@126.com