基於Netty的高性能RPC框架Nifty 服務端啟動全解析

2020-10-09 無碼可編

1. 前言

Thrift是Facebook貢獻給apache的rpc框架,但是這款框架的java版本在公司內部並不是那麼受待見,因為其性能相比C++版本差了很多,但是後續基於netty重寫了以後性能得到了極大的提升,相比於C++版本已經差距不大了。為此取了個新的名字Nifty = Netty + Thrift。

如果你使用過thrift的話,基本都會使用自動生成的代碼,那真的是沒法看,即使定義一個簡單的類都會生成巨多的代碼,把read,write方法全部寫到裡面去了。總之早期的thrfit各方面都似乎不那麼友好。後面架構進行了升級,提供了新的swift庫,注意這個不是ios的swift,從而生成的java類和普通的java類基本一致,無非多了點註解,而序列化反序列化也都移到了相應的包中,從而使得我們的代碼非常簡潔易懂。

其實這款rpc框架的性能是非常不錯的,早幾年性能是好過grpc的,目前小米還是在用的。這款框架很輕量,即不提供服務治理的功能。如果公司規模不大急需做功能,暫時沒精力去做服務治理的話可能還是會選擇dubbo等帶服務治理功能的rpc框架。但是恰恰是thrift不提供服務治理,這樣公司可以自己去定義服務治理的功能。

目前不管是書籍還是博客等關於thrift的都是少之又少,最近突然愛學習了,所以打算寫一系列thrift相關的博客,關於使用基本不會介紹的過多,因為基本使用也就十幾行代碼,主要是介紹內部處理邏輯。具體的包括Thrift框架分析,netty框架分析,分布式服務治理等三個方面。

為了方便後續統稱為Thrift而不是Nifty,因為很多代碼還是沿用的Thrift。

2. Thrfit服務端創建與核心組件介紹

EchoServiceImpl service = new EchoServiceImpl();ThriftCodecManager manager = new ThriftCodecManager();ThriftServiceProcessor processor = new ThriftServiceProcessor(manager, ImmutableList.of(), service);ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));server.start();複製代碼

其中EchoServiceImpl是使用swift代碼生成器生成的接口的實現類,有個echo方法,簡單說下這幾個組件。

  1. ThriftCodecManager:編解碼器的管理類,會將各類編解碼器,比如StringCodec添加到緩存中。
  2. ThriftServiceProcessor:thrift服務處理器,服務端收到數據後,最終將由這個類來進行處理,可以理解為最核心的類了。
  3. ThriftServer: thrift伺服器,主要用來設置參。啟動服務,說具體點就是設置好netty的一些處理器等參數,然後啟動netty服務。
  4. ThriftServerConfig: Thrfit服務啟動的一些配置類,包括了埠號,線程池,線程數量等。

3. 服務端啟動流程極簡介紹

從上面的組件分析是不是能猜到一點thrift的內部處理流程。簡單兩句話就是,創建各類自定義處理器handler,添加到netty的處理器集合中,然後啟動netty服務。當收到客戶端發來的數據後,交由特定處理器進行數據的處理,根據協議和編解碼器從buffer中進行數據的解析和轉換,最終得到類名,方法的參數和方法名等(各類信息都能解析到);從ThriftServicProcessor查到Method,傳入方法參數,反射執行得到結果,然後將結果通過netty響應給客戶端。

4. 服務端啟動全解析

先從ThriftServer創建和啟動來看,了解總體的流程,後續再回過頭來看各個組件的處理流程。

ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));public ThriftServer(NiftyProcessor processor, ThriftServerConfig config){ this(processor, config, new NiftyTimer("thrift"));}複製代碼

繼續往下,追到最終的構造方法

public ThriftServer( final NiftyProcessor processor, ThriftServerConfig config, @ThriftServerTimer Timer timer, Map<String, ThriftFrameCodecFactory> availableFrameCodecFactories, Map<String, TDuplexProtocolFactory> availableProtocolFactories, @ThriftServerWorkerExecutor Map<String, ExecutorService> availableWorkerExecutors, NiftySecurityFactoryHolder securityFactoryHolder){ NiftyProcessorFactory processorFactory = new NiftyProcessorFactory(){ @Override public NiftyProcessor getProcessor(TTransport transport) { return processor; } }; String transportName = config.getTransportName(); String protocolName = config.getProtocolName(); checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName); checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName); configuredPort = config.getPort(); workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors); acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build()); acceptorThreads = config.getAcceptorThreadCount(); ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build()); ioThreads = config.getIoThreadCount(); serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT), new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT)); ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder() .name("thrift") .listen(configuredPort) .limitFrameSizeTo((int) config.getMaxFrameSize().toBytes()) .clientIdleTimeout(config.getIdleConnectionTimeout()) .withProcessorFactory(processorFactory) .limitConnectionsTo(config.getConnectionLimit()) .limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection()) .thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName)) .protocol(availableProtocolFactories.get(protocolName)) .withSecurityFactory(securityFactoryHolder.niftySecurityFactory) .using(workerExecutor) .taskTimeout(config.getTaskExpirationTimeout()) .withSecurityFactory(config.getSecurityFactory()) .withHeader(config.getHeader()) .withServerHandler(config.getNiftyServerHandler()) .build(); NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder(); nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog()); nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount()); nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount()); nettyServerConfigBuilder.setTimer(timer); NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build(); transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);}複製代碼

構造方法簡單來說主要做了兩件事:

  1. 創建和獲取netty需要的連接線程池和io處理線程池以及數量,從而構建netty服務組件NioServerSocketChannelFactory;
  2. 構建netty服務的配置類nettyServerConfig, thrift服務的配置類ThriftServerDef

我們再來看下細節點的東西,思考一些問題,如果不感興趣可以跳過。

工作線程池workerExecutor的創建:

workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);複製代碼

默認傳進來的availableWorkerExecutors是空的,所以最終是構建一個新的線程池,

最終調用的方法是makeDefaultWorkerExecutor,下面的代碼稍微簡化了一點。

  • 默認得到的就是個無解的隊列;
  • 如果你需要構建有限容量隊列的線程池,可以在創建config後調用setMaxQueuedRequests來設置隊列容量
  • 超出隊列容量後將執行線程池拒絕策略(throw RejectedExecutionException)
  • 默認核心線程數和最大線程數是一樣的,數值為200;
  • 使用guava提供的ThreadFactoryBuilder來構建線程工廠,主要是取個容易理解的名字;在thrift中大量使用了guava提供的工具類。

private ExecutorService makeDefaultWorkerExecutor(){ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); return new ThreadPoolExecutor(getWorkerThreads(), getWorkerThreads(), 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(), new ThreadPoolExecutor.AbortPolicy());}複製代碼

netty的連接線程池和io線程池創建

acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());acceptorThreads = config.getAcceptorThreadCount();ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());ioThreads = config.getIoThreadCount();複製代碼

  • 該線程池創建的方法是netty提供的,看名字是不是像無界隊列的線程池呢?
  • 連接數量的限定是1,io線程數量的限定默認是電腦核數*2,但都是可以在配置類中指定的。

關於netty配置類NettyServerConfig的構建nettyServerConfigBuilder.build(), 在之前設置了連接線程的線程數和io線程池線程數以及定時器timer,調用build後如下:

public NettyServerConfig build() { Timer timer = getTimer(); ExecutorService bossExecutor = getBossExecutor(); int bossThreadCount = getBossThreadCount(); ExecutorService workerExecutor = getWorkerExecutor(); int workerThreadCount = getWorkerThreadCount(); return new NettyServerConfig( getbootstrapOptions(), timer != null ? timer : new NiftyTimer(threadNamePattern("")), bossExecutor != null ? bossExecutor : buildDefaultBossExecutor(), bossThreadCount, workerExecutor != null ? workerExecutor : buildDefaultWorkerExecutor(), workerThreadCount );}private ExecutorService buildDefaultBossExecutor() { return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-boss-%s")));}private ExecutorService buildDefaultWorkerExecutor() { return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-worker-%s")));}複製代碼

由於沒有設置兩個線程池,所以會設置默認的線程池,注意這裡一個是Boss線程池一個是Worker線程池。 其中的timer在後續構建netty處理器的時候會多次用到。

allChannels是一個netty提供的channelGroup:

private final DefaultChannelGroup allChannels = new DefaultChannelGroup();複製代碼

在介紹後續流程前,先了解下netty服務端創建步驟,因為這裡用的是netty3,和我們比較熟悉的netty4差距有點大,可以對比著看下。

ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), );ServerBootstrap bootstrap = new ServerBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("handler1", new Handler1()); return pipeline; }});Channel channel = bootstrap.bind(new InetSocketAddress(8081));複製代碼

構建NettyServerTransport的時候,在構造方法裡面就是進行netty的一些設置。

transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);複製代碼

在這裡面主要是進行一些對象成員變量的設置,最後也是最重要的就是構建ChannelPipelineFactory,在其中設置各種處理器,大部分繼承自SimpleChannelUpstreamHandler,少部分繼承自ChannelDownstreamHandler,可以類比netty4的ChannelInboundHandler,ChannelOutboundHandler。

this.pipelineFactory = new ChannelPipelineFactory(){ @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); // 設置處理器handler return cp; }}複製代碼

我們來看有哪些處理器。

  1. ConnectionLimiter: 連接限流器, 創建的時候需要指定最大連接數,以及初始值為0的一個計數器;每次建立連接的時候計數器數值 + 1,關閉的時候數值 - 1;當連接數達到上限,則關閉通道,即channel。
  2. ChannelStatistics: 傳入allChannels來構建對象,內部持有一個channelCount = 0,來統計建立通道channel數, 每次建立連接接受到數據的時候channelCount + 1,提供了get方法來獲取channelCount;同時將channel加入到allChannels中。
  3. 編解碼處理器DefaultThriftFrameCodec,每次收到的數據和傳出的數據都需要進行一次編碼或者解碼。
  4. 連接的上下文處理器ConnectionContextHandler,在建立連接的時候創建NiftyConnectionContext,連接的上下文環境,包含了removeAddress和屬性map,綁定到ChannelHandlerContext,即ctx.setAttachment(context);。
  5. netty提供的關於超時處理器IdleStateHandler和IdleDisconnectHandler。
  6. NiftyDispatcher,這個處理器最為核心,收到buffer經過解碼後數據就傳到了該處理器,該處理器會對數據進行一系列的處理和方法的調用等。
  7. 異常事件處理器NiftyExceptionLogger,重寫了log方法,如果出現異常事件,該處理器會列印相應的異常日誌。

來一覽處理過程吧

public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels){ this.def = def; this.nettyServerConfig = nettyServerConfig; this.port = def.getServerPort(); this.allChannels = allChannels; final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections()); this.channelStatistics = new ChannelStatistics(allChannels); this.pipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory(); NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig); cp.addLast("connectionContext", new ConnectionContextHandler()); cp.addLast("connectionLimiter", connectionLimiter); cp.addLast(ChannelStatistics.NAME, channelStatistics); cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(), inputProtocolFactory)); if (def.getClientIdleTimeout() != null) { // Add handlers to detect idle client connections and disconnect them cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(), def.getClientIdleTimeout().toMillis(), NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler()); } cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer())); cp.addLast("exceptionLogger", new NiftyExceptionLogger()); return cp; } }; }複製代碼

我們再回到server.start(),看下ThriftServer如何啟動的

ThriftServer: 初始狀態是NOT_STARTED

public synchronized ThriftServer start(){ checkState(state != State.CLOSED, "Thrift server is closed"); if (state == State.NOT_STARTED) { transport.start(serverChannelFactory); state = State.RUNNING; } return this;}複製代碼

NettyServerTransport: 標準的netty服務端創建過程,其中pipelineFactory就是在前面NettyServerTransport構造方法中所創建的。

public void start(ServerChannelFactory serverChannelFactory){ bootstrap = new ServerBootstrap(serverChannelFactory); bootstrap.setOptions(nettyServerConfig.getBootstrapOptions()); bootstrap.setPipelineFactory(pipelineFactory); serverChannel = bootstrap.bind(new InetSocketAddress(port)); // ... }複製代碼

到這裡Thrift的服務端啟動就介紹完了,下一部分將會介紹服務端接受數據,處理數據和響應結果的流程。


作者:可以回家加班嗎
連結:https://juejin.im/post/6879786471901888525
來源:掘金
著作權歸作者所有。商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。

相關焦點

  • 手寫一個基於Netty+Kyro+Zookeeper的RPC框架,項目經驗妥妥拿下
    介紹這是一款基於 Netty+Kyro+Zookeeper 實現的 RPC 框架。一個基本的 RPC 框架設計思路注意 :我們這裡說的 RPC 框架指的是:可以讓客戶端直接調用服務端方法就像調用本地方法一樣簡單的框架,比如我前面介紹的 Dubbo、Motan、gRPC 這些。
  • Netty實現高性能RPC伺服器優化篇之消息序列化
    有關如何利用Netty開發實現,高性能RPC伺服器的一些設計思路、設計原理,以及具體的實現方案。在文章的最後提及到,其實基於該方案設計的RPC伺服器的處理性能,還有優化的餘地。、定製Netty的RPC服務端、客戶端,採用何種序列化來進行RPC消息對象的網絡傳輸。
  • 基於Netty高性能RPC框架Nifty協議、傳輸層、編解碼
    ThriftCodecManager與對象讀取在編寫服務端代碼的時候,我們創建了ThriftCodecManager這個對象,該對象是用來管理編解碼器ThriftCodec<?>的,在初始化的時候會創建各種類型的編解碼器放到緩存中, 以供服務處理器ThrfitServiceProcessor使用。接下來我們就深入分析這個編解碼器管理器。
  • grpc-example 基於gRPC實現的簡單rpc框架
    grpc-example 基於gRPC實現的簡單rpc框架基於gRPC實現的簡單rpc框架,本身gRpc已經是一個全功能的通訊框架,基於http/2.0標準協議可以實現更好的性能。ServiceHandler serviceHandler; public GrpcServer(int port) { this.serviceHandler = new ServiceHandler(); this.server = ServerBuilder.forPort(port) // 將具體實現的服務添加到gRPC服務中 .addService(new GrpcServerHandler
  • 怎樣用Java去編寫基於netty的RPC框架呢?
    ,是一個高性能,異步事件驅動的NIO框架,基於JAVA NIO提供的API實現,他提供了TCP UDP和文件傳輸的支持,,所有操作都是異步非阻塞的.通過Futrue-Listener機制,本質就是Reactor模式的現實,Selector作為多路復用器,EventLoop作為轉發器,而且,netty對NIO中buffer做優化,大大提高了性能Netty中Bootstrap和Channel
  • 手寫一個 RPC 框架。畢設/項目經驗穩了
    原文連結:https://mp.weixin.qq.com/s/Avq4JBT-6-Dxgl7q8M251Qguide-rpc-framework 是一個 一款基於 Netty+Kyro+zookeeper 實現的自定義 RPC 框架。
  • RPC 框架,底層到底什麼原理?
    2.5 服務端數據接受這一塊使用netty,可以快速一個高性能、高可靠的一個服務端。關於netty網上學習的資料很多,這裡也只是宏觀的講解RPC原理,就不展開。2.6 真實調用服務端獲取客戶端請求的數據後, 調用請求中的方法,方法參數值,通過反射調用真實的方法,獲取其返回值,將其序列化封裝,通過netty進行數據返回,客戶端在接受數據並解析,這就完成了一次rpc請求調用的全過程。
  • 手寫了一個RPC框架,成功幫助讀者斬獲字節等大廠offer
    介紹guide-rpc-framework 是一款基於 Netty+Kyro+Zookeeper 實現的 RPC 框架。一個基本的 RPC 框架設計思路注意 :我們這裡說的 RPC 框架指的是:可以讓客戶端直接調用服務端方法就像調用本地方法一樣簡單的框架,比如我前面介紹的 Dubbo、Motan、gRPC 這些。
  • 手寫一個RPC框架,成功幫讀者斬獲字節、阿里等大廠Offer
    介紹guide-rpc-framework 是一款基於 Netty+Kyro+Zookeeper 實現的 RPC 框架。一個基本的 RPC 框架設計思路注意 :我們這裡說的 RPC 框架指的是:可以讓客戶端直接調用服務端方法就像調用本地方法一樣簡單的框架,比如我前面介紹的 Dubbo、Motan、gRPC 這些。
  • 我手寫完RPC框架,成功幫助讀者斬獲阿里等大廠offer
    介紹guide-rpc-framework 是一款基於 Netty+Kyro+Zookeeper 實現的 RPC 框架。一個基本的 RPC 框架設計思路注意 :我們這裡說的 RPC 框架指的是:可以讓客戶端直接調用服務端方法就像調用本地方法一樣簡單的框架,比如我前面介紹的 Dubbo、Motan、gRPC 這些。
  • go-netty 高性能網絡框架
    GO-NETTYgithub.com/go-netty/go-nettyIntroduction (介紹)go-netty is heavily inspired by netty
  • gRPC 1.21.0 發布,谷歌開源的高性能 RPC 框架
    grpc 1.21.0 發布了。
  • 如何自己設計一個RPC或HTTP協議服務框架
    你可以利用本文學習的內容去實現一下業務自己設計一個Tomcat容器;自己設計一款RPC框架;也可以在你的應用程式內部去在啟動一個通信服務。文章後面有演示。項目github地址關注私信: 01 自動回復一、通信框架設計要考慮的點通信肯定是雙方間的,客戶端發送數據,服務端處理數據。我們日常的開發都是基於http協議的,是不用考慮服務端和客戶端如何去發送數據的。
  • 從零學習netty網絡IO通訊開發框架
    性能 更好的吞吐量, 更低的延遲 資源消耗減少 最小化不必要的內存副本 netty可以運用在那些領域?1.網際網路行業典型的應用有:阿里分布式服務框架 Dubbo 的 RPC 框架使用 Dubbo 協議進行節點間通信,Dubbo 協議默認使用 Netty 作為基礎通信組件,用於實現各進程節點之間的內部通信。
  • gRPC 1.8.2 發布,Google 高性能 RPC 框架
    gRPC 1.8.2 已發布,該版本主要是修復 bug,具體如下:下載地址gRPC 是一個高性能、開源、通用的 RPC 框架,面向移動和 HTTP/2 設計
  • 手擼rpc框架,並基於spring進行二次註解開發
    一、rpc是什麼RPC是遠程過程調用(Remote Procedure Call)的縮寫形式。客戶端通過網絡傳輸,遠程調用服務端的函數,服務端處理客戶端的調用請求,然後將結果通過網絡傳輸返回客戶端。>:遠程服務端提供的方法接口。
  • 花椒服務端 gRPC 開發實踐
    gRPC 是一個高性能、通用的開源 RPC 框架,由 Google 開發並基於 HTTP/2 協議標準而設計,基於 ProtoBuf(Protocol Buffers)序列化協議開發,且支持當前主流開發語言。gRPC 通過定義一個服務並指定一個可以遠程調用的帶有參數和返回類型的的方法,使客戶端可以直接調用不同機器上的服務應用的方法,就像是本地對象一樣。
  • Guide自己動手寫了一個簡單的RPC框架
    Guide-rpc-framework 目前只實現了 RPC 框架最基本的功能,一些可優化點都在下面提到了,有興趣的小夥伴可以自行完善。介紹guide-rpc-framework 是一款基於 Netty+Kyro+zookeeper 實現的 RPC 框架。
  • 神煩,老大要我寫一個RPC框架
    這個時候,就需要「註冊中心」登場了,具體來說是這樣子的:服務提供者在啟動的時候,將自己應用所在機器的信息提交到註冊中心上面。服務消費者在啟動的時候,將需要消費的接口所在機器的信息抓回來。網絡協議,通俗理解,意思就是說我們的客戶端發送的數據應該長什麼樣子,服務端可以去解析出來知道要做什麼事情。
  • Golang之分布式 RPC 服務框架(rpcx)
    通過該協議程式設計師可以實現像調取本地函數一樣,調取遠程服務的函數。這裡介紹一個高效的rpc庫(rpcx)。rpcx 是一個分布式的Go語言的 RPC 框架,支持Zookepper、etcd、consul多種服務發現方式,多種服務路由方式, 是目前性能最好的 RPC 框架之一。