詳解Flink組件通信——RPC協議

2022-01-23 大數據那些事

收錄於話題 #Flink 12個

Flink組件通訊過程

RPC(本地/遠程)調用,底層是通過Akka提供的tell/ask方法進行通信。

Flink中RPC框架中涉及的主要類:

Flink的RPC協議通過RpcGateway來定義,主要定義通信行為;用於遠程調用RpcEndpoint的某些方法,可以理解為客服端代理。

若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通信時,必須讓其提供對應地址。

從類繼承圖可以看到基本上所有組件都實現了RpcGateway接口,其代碼如下:

public interface RpcGateway {String getAddress();String getHostname();}

RpcEndpoint是通信終端,提供RPC服務組件的生命周期管理(start、stop)。每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,其構造函數如下:

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService =checkNotNull(rpcService, "rpcService");this.endpointId =checkNotNull(endpointId, "endpointId");this.rpcServer =rpcService.startServer(this);this.mainThreadExecutor= new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);}

構造的時候調用rpcService.startServer()啟動RpcServer,進入可以接收處理請求的狀態,最後將RpcServer綁定到主線程上真正執行起來。

在RpcEndpoint中還定義了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法來執行Rpc調用,值得注意的是在Flink的設計中,對於同一個Endpoint,所有的調用都運行在主線程,因此不會有並發問題,當啟動RpcEndpoint/進行Rpc調用時,其會委託RcpServer進行處理。

Akka有兩種核心的異步通信方式:tell和ask。

RpcService 和 RpcServer是RpcEndPoint的成員變量。

1)RpcService 是Rpc服務的接口,其主要作用如下:

根據提供的RpcEndpoint來啟動和停止RpcServer(Actor);

根據提供的地址連接到RpcServer,並返回一個RpcGateway;

延遲/立刻調度Runnable、Callable;

在Flink中實現類為AkkaRpcService,是 Akka 的 ActorSystem 的封裝,基本可以理解成 ActorSystem 的一個適配器。在ClusterEntrypoint(JobMaster)和TaskManagerRunner(TaskExecutor)啟動的過程中初始化並啟動。

AkkaRpcService中封裝了ActorSystem,並保存了ActorRef到RpcEndpoint的映射關係。RpcService跟RpcGateway類似,也提供了獲取地址和埠的方法。

在構造RpcEndpoint時會啟動指定rpcEndpoint上的RpcServer,其會根據RpcEndpoint類型(FencedRpcEndpoint或其他)來創建不同的AkkaRpcActor(FencedAkkaRpcActor或AkkaRpcActor),並將RpcEndpoint和AkkaRpcActor對應的ActorRef保存起來,AkkaRpcActor是底層Akka調用的實際接收者,RPC的請求在客戶端被封裝成RpcInvocation對象,以Akka消息的形式發送。

最終使用動態代理將所有的消息轉發到InvocationHandler,具體代碼如下:

public <Cextends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {... ...@SuppressWarnings("unchecked")   RpcServerserver = (RpcServer) Proxy.newProxyInstance(     classLoader,     implementedRpcGateways.toArray(newClass<?>[implementedRpcGateways.size()]),     akkaInvocationHandler);return server;}

2)RpcServer負責接收響應遠端RPC消息請求。有兩個實現:

RpcServer的啟動是通知底層的AkkaRpcActor切換為START狀態,開始處理遠程調用請求:

class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint,RpcServer {         @Override         public void start() {                  rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender());         }}

AkkaRpcActor是Akka的具體實現,主要負責處理如下類型消息:

1)本地Rpc調用LocalRpcInvocation

會指派給RpcEndpoint進行處理,如果有響應結果,則將響應結果返還給Sender。

2)RunAsync & CallAsync

這類消息帶有可執行的代碼,直接在Actor的線程中執行。

3)控制消息ControlMessages

用來控制Actor行為,START啟動,STOP停止,停止後收到的消息會丟棄掉。

RPC通信過程分為請求和響應。

1、 RPC請求發送

在RpcService中調用connect()方法與對端的RpcEndpoint建立連接,connect()方法根據給的地址返回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是RpcServer的實現)。

前面分析到客戶端提供代理對象RpcServer,代理對象會調用AkkaInvocationHandler的invoke方法並傳入RPC調用的方法和參數信息,代碼如下:

AkkaInvocationHandler.java

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass =method.getDeclaringClass(); Object result;// 判斷方法所屬的classif(declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class)|| declaringClass.equals(RpcServer.class)) { result = method.invoke(this, args); } else if(declaringClass.equals(FencedRpcGateway.class)) {throw new UnsupportedOperationException("AkkaInvocationHandler does not support thecall FencedRpcGateway#" + method.getName() + ". This indicates thatyou retrieved a FencedRpcGateway without specifying a " +"fencingtoken. Please use RpcService#connect(RpcService, F, Time) with F being thefencing token to " +"retrieve aproperly FencedRpcGateway."); } else { // rpc調用 result = invokeRpc(method, args); }return result;}

代碼中判斷所屬的類,如果是RPC方法,則調用invokeRpc方法。將方法調用封裝為RPCInvocation消息。如果是本地則生成LocalRPCInvocation,本地消息不需要序列化,如果是遠程調用則創建RemoteRPCInvocation。

判斷遠程方法調用是否需要等待結果,如果無需等待(void),則使用向Actor發送tell類型的消息,如果需要返回結果,則向Acrot發送ask類型的消息,代碼如下:

AkkaInvocationHandler.java

private Object invokeRpc(Method method, Object[]args) throws Exception {         String methodName = method.getName();         Class<?>[] parameterTypes =method.getParameterTypes();         Annotation[][] parameterAnnotations =method.getParameterAnnotations();         Time futureTimeout =extractRpcTimeout(parameterAnnotations, args, timeout);          final RpcInvocation rpcInvocation =createRpcInvocationMessage(methodName, parameterTypes, args);          Class<?> returnType =method.getReturnType();          final Object result;          if (Objects.equals(returnType, Void.TYPE)) {                  tell(rpcInvocation);                   result = null;         } else {                                                                        final ThrowablecallStackCapture = captureAskCallStack ? new Throwable() : null;                                                       final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);                   final CompletableFuture<Object> completableFuture = newCompletableFuture<>();                  resultFuture.whenComplete((resultValue,failure) -> {                          if (failure != null) {                                   completableFuture.completeExceptionally(resolveTimeoutException(failure,callStackCapture, method));                          } else {                                   completableFuture.complete(deserializeValueIfNeeded(resultValue,method));                          }                  });                   if (Objects.equals(returnType,CompletableFuture.class)) {                                                    result =completableFuture;                  } else {                          try {                                                                      result = completableFuture.get(futureTimeout.getSize(),futureTimeout.getUnit());                          } catch(ExecutionException ee) {                                   throw new RpcException("Failure while obtaining synchronous RPC result.",ExceptionUtils.stripExecutionException(ee));                          }                  }         }          return result;}

2、 RPC請求響應

RPC消息通過RpcEndpoint所綁定的Actor的ActorRef發送的,AkkaRpcActor是消息接收的入口,AkkaRpcActor在RpcEndpoint中構造生成,負責將消息交給不同的方法進行處理。

AkkaRpcActor.java

public Receive createReceive() {         return ReceiveBuilder.create()                  .match(RemoteHandshakeMessage.class,this::handleHandshakeMessage)                  .match(ControlMessages.class,this::handleControlMessage)                  .matchAny(this::handleMessage)                  .build();}

接收的消息有3種:

1)握手消息

在客戶端構造時會通過ActorSelection發送過來。收到消息後檢查接口、版本是否匹配。

AkkaRpcActor.java

private void handleHandshakeMessage(RemoteHandshakeMessagehandshakeMessage) {         if(!isCompatibleVersion(handshakeMessage.getVersion())) {                                    sendErrorIfSender(newAkkaHandshakeException(                          String.format(                                   "Versionmismatch between source (%s) and target (%s) rpc component. Please verify thatall components have the same version.",                                   handshakeMessage.getVersion(),                                   getVersion())));         } else if(!isGatewaySupported(handshakeMessage.getRpcGateway())) {                                    sendErrorIfSender(newAkkaHandshakeException(                          String.format(                                   "The rpcendpoint does not support the gateway %s.",                                   handshakeMessage.getRpcGateway().getSimpleName())));         } else {                  getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE),getSelf());         }}

2)控制消息

在RpcEndpoint調用start方法後,會向自身發送一條Processing.START消息來轉換當前Actor的狀態為STARTED,STOP也類似,並且只有在Actor狀態為STARTED時才會處理RPC請求。

AkkaRpcActor.java

private void handleControlMessage(ControlMessages controlMessage) {         try {                  switch (controlMessage) {                          case START:                                   state =state.start(this);                                   break;                          case STOP:                                   state =state.stop();                                   break;                          case TERMINATE:                                   state =state.terminate(this);                                   break;                          default:                                   handleUnknownControlMessage(controlMessage);                  }         } catch (Exception e) {                  this.rpcEndpointTerminationResult= RpcEndpointTerminationResult.failure(e);                  throw e;         }}

3)RPC消息

通過解析RpcInvocation獲取方法名和參數類型,並從RpcEndpoint類中找到Method對象,通過反射調用該方法。如果有返回結果,會以Akka消息的形式發送回發送者。

AkkaRpcActor.java

private void handleMessage(final Object message) {         if (state.isRunning()) {                  mainThreadValidator.enterMainThread();                   try {                          handleRpcMessage(message);                  } finally {                          mainThreadValidator.exitMainThread();                  }         } else {                  log.info("The rpcendpoint {} has not been started yet. Discarding message {} until processing isstarted.",                          rpcEndpoint.getClass().getName(),                          message.getClass().getName());                   sendErrorIfSender(newAkkaRpcException(                          String.format("Discardmessage, because the rpc endpoint %s has not been started yet.",rpcEndpoint.getAddress())));         }}

「還不過癮?一頭霧水?關注公眾號,回復Flink,即能獲取全部詳細視頻講解。」

微信號|bigdata_story

B站|大數據那些事

想獲取更多更全資料

掃碼加好友入群

歡迎各位大佬加入開源共享

共同面對大數據領域疑難問題

來稿請投郵箱:miaochuanhai@126.com

相關焦點

  • Flink 最鋒利的武器:Flink SQL 入門和實戰
    新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能與 Kafka 一起使用。舊描述符 org.apache.flink.table.descriptors.OldCsv 用於文件系統連接器。
  • Flink-1.12.0 CEP詳解與實戰
    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}<
  • Modbus通信協議
    ModBus網絡只有一個主機,所有通信都由他發出。網絡可支持247個之多的遠程從屬控制器,但實際所支持的從機數要由所用通信設備決定。採用這個系統,各PC可以和中心主機交換信息而不影響各PC執行本身的控制任務。了解Modbus通信協議是怎麼回事,在現場就可以用各種第三方的小軟體做通信測試了。
  • pytorch編程之分布式 RPC 框架入門
    例如,一個簡單的優化可能是將當前狀態和最後的報酬打包到一個 RPC 中,以減少通信開銷。但是,目標是演示 RPC API,而不是為 CartPole 構建最佳的求解器。因此,在此示例中,讓邏輯保持簡單,並明確兩個步驟。
  • 用了 History Server,媽媽再也不用擔心我的 Flink 作業半夜掛了
    # flink job 運行完成後的日誌存放目錄jobmanager.archive.fs.dir: hdfs:///flink/history-log# The address under which the web-based HistoryServer listens.
  • 最詳細的流媒體傳輸協議-rtsp協議詳解
    流媒體傳輸協議-rtsp協議詳解參閱:RTSP協議詳解和分析從零開始寫一個RTSP伺服器(一)RTSP協議講解關於RTSP_RTP_RTCP協議的深刻初步介紹rtspRTSP出現以前,最熱的大概就是HTTP協議。
  • Flink Checkpoint問題排查實用指南
    2019-09-02 16:26:20,972 INFO [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522
  • 一篇文章了解RPC框架原理
    1.RPC框架的概念RPC(Remote Procedure Call)–遠程過程調用,通過網絡通信調用不同的服務,共同支撐一個軟體系統,微服務實現的基石技術
  • 詳解Flink通訊模型——Akka與Actor模型
    Flink通過Akka進行的分布式通信的實現,在0.9版中採用。使用Akka,所有遠程過程調用現在都實現為異步消息。這主要影響組件JobManager,TaskManager 和JobClient。將來,甚至有可能將更多的組件轉換為參與者,從而允許它們發送和處理異步消息。
  • RS485與Modbus通信協議教程
    【導讀】1979年施耐德電氣制定了一個用於工業現場的總線協議Modbus協議,現在工業中使用RS485通信場合很多都採用Modbus協議,所以今天我們來了解下
  • RS485與Modbus通信協議教程!
    【導讀】1979年施耐德電氣制定了一個用於工業現場的總線協議Modbus協議,現在工業中使用RS485通信場合很多都採用
  • RS485與Modbus通信協議匯總!
    1979年施耐德電氣制定了一個用於工業現場的總線協議Modbus協議,現在工業中使用RS485通信場合很多都採用Modbus協議,所以今天我們來了解下RS485通信和Modbus通信協議。2、接口使用的信號線與其他設備形成共地模式的通信,這種共地模式傳輸容易產生幹擾,並且抗幹擾性能也比較弱。3、傳輸距離、速率都有限,最多只能通信幾十米;只能兩點之間進行通信,不能夠實現多機聯網通信。
  • 螞蟻金服通信框架SOFABolt解析 |序列化機制(Serializer)
    本文為《螞蟻金服通信框架SOFABolt解析》系列第二篇,作者魯道,就職於 E 籤寶。《螞蟻金服通信框架SOFABolt解析》系列由 SOFA 團隊和源碼愛好者們出品。設計及實現一個優秀的網絡通信框架,必然要有一個靈活的,高性能的序列化機制。那麼,SOFABolt 序列化機制的設計目標是什麼呢?具體又是如何設計的呢?首先說靈活,靈活指的是,框架的使用方(這裡指的是網絡通信框架的使用方,例如 RPC,消息中心等中間件)能夠自定義自己的實現,即用戶決定使用什麼類型的序列化以及怎麼序列化。
  • GDB 調試實戰之 Redis 通信協議
    時下的業界,相對於傳統的關係型資料庫,以 key-value 思想實現的 NoSQL 內存資料庫非常流行,而提到內存資料庫
  • Ambassador 0.52 新特性:會話親和性、負載均衡控制、gRPC-Web
    作者:Richard Li 譯者:白小白 原文:http://t.cn/E6cZoyG現時的雲原生應用由多種異構的服務或者微服務組成,服務間、服務與客戶端之間的通信需要跨越浩繁的通信協議和拓撲結構。Ambassador就是部署在這樣不斷增長的異構的工作負載環境之下,也因此我們對於這種境況有著直接的認知。
  • Flink SQL 實戰:雙流 join 場景應用
    DROP TABLE IF EXISTS flink_rtdw.demo.adsdw_dwd_max_click_mobileapp;CREATE TABLE flink_rtdw.demo.adsdw_dwd_max_click_mobileapp ( ...
  • Flink Back Pressure(背壓)是怎麼實現的?有什麼絕妙之處?
    Reference:https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/back_pressure.htmlhttps://www.da-platform.com/blog/how-flink-handles-backpressure本文地址:https://
  • 深入解析ZNBase分布式SQL引擎架構的五大服務組件
    但是在協議層,只要是不涉及到存儲的部分,本質還是單機實例的 SQL 引擎,不涉及分布式存儲和分布式計算,這樣就和傳統資料庫兼容性非常高。浪潮雲溪 NewSQL 資料庫 ZNBase 完美地繼承了 Spanner 的設計理念,實現了基於對等架構的分布式 SQL 引擎。
  • 小知識 | HART協議詳解
    .   —— FeildcommgroupHART協議的概念  HART英文全稱為Highway Addressable Remote Transducer,中文被譯為可尋址遠程傳感器高速通道的開放通信協議,是美國ROSEMOUNT公司於1985年推出的一種用於現場智能儀表和控制室設備之間的通信協議。