Thrift是Facebook貢獻給apache的rpc框架,但是這款框架的java版本在公司內部並不是那麼受待見,因為其性能相比C++版本差了很多,但是後續基於netty重寫了以後性能得到了極大的提升,相比於C++版本已經差距不大了。為此取了個新的名字Nifty = Netty + Thrift。
如果你使用過thrift的話,基本都會使用自動生成的代碼,那真的是沒法看,即使定義一個簡單的類都會生成巨多的代碼,把read,write方法全部寫到裡面去了。總之早期的thrfit各方面都似乎不那麼友好。後面架構進行了升級,提供了新的swift庫,注意這個不是ios的swift,從而生成的java類和普通的java類基本一致,無非多了點註解,而序列化反序列化也都移到了相應的包中,從而使得我們的代碼非常簡潔易懂。
其實這款rpc框架的性能是非常不錯的,早幾年性能是好過grpc的,目前小米還是在用的。這款框架很輕量,即不提供服務治理的功能。如果公司規模不大急需做功能,暫時沒精力去做服務治理的話可能還是會選擇dubbo等帶服務治理功能的rpc框架。但是恰恰是thrift不提供服務治理,這樣公司可以自己去定義服務治理的功能。
目前不管是書籍還是博客等關於thrift的都是少之又少,最近突然愛學習了,所以打算寫一系列thrift相關的博客,關於使用基本不會介紹的過多,因為基本使用也就十幾行代碼,主要是介紹內部處理邏輯。具體的包括Thrift框架分析,netty框架分析,分布式服務治理等三個方面。
為了方便後續統稱為Thrift而不是Nifty,因為很多代碼還是沿用的Thrift。
EchoServiceImpl service = new EchoServiceImpl();ThriftCodecManager manager = new ThriftCodecManager();ThriftServiceProcessor processor = new ThriftServiceProcessor(manager, ImmutableList.of(), service);ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));server.start();複製代碼
其中EchoServiceImpl是使用swift代碼生成器生成的接口的實現類,有個echo方法,簡單說下這幾個組件。
從上面的組件分析是不是能猜到一點thrift的內部處理流程。簡單兩句話就是,創建各類自定義處理器handler,添加到netty的處理器集合中,然後啟動netty服務。當收到客戶端發來的數據後,交由特定處理器進行數據的處理,根據協議和編解碼器從buffer中進行數據的解析和轉換,最終得到類名,方法的參數和方法名等(各類信息都能解析到);從ThriftServicProcessor查到Method,傳入方法參數,反射執行得到結果,然後將結果通過netty響應給客戶端。
先從ThriftServer創建和啟動來看,了解總體的流程,後續再回過頭來看各個組件的處理流程。
ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));public ThriftServer(NiftyProcessor processor, ThriftServerConfig config){ this(processor, config, new NiftyTimer("thrift"));}複製代碼
繼續往下,追到最終的構造方法
public ThriftServer( final NiftyProcessor processor, ThriftServerConfig config, @ThriftServerTimer Timer timer, Map<String, ThriftFrameCodecFactory> availableFrameCodecFactories, Map<String, TDuplexProtocolFactory> availableProtocolFactories, @ThriftServerWorkerExecutor Map<String, ExecutorService> availableWorkerExecutors, NiftySecurityFactoryHolder securityFactoryHolder){ NiftyProcessorFactory processorFactory = new NiftyProcessorFactory(){ @Override public NiftyProcessor getProcessor(TTransport transport) { return processor; } }; String transportName = config.getTransportName(); String protocolName = config.getProtocolName(); checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName); checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName); configuredPort = config.getPort(); workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors); acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build()); acceptorThreads = config.getAcceptorThreadCount(); ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build()); ioThreads = config.getIoThreadCount(); serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT), new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT)); ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder() .name("thrift") .listen(configuredPort) .limitFrameSizeTo((int) config.getMaxFrameSize().toBytes()) .clientIdleTimeout(config.getIdleConnectionTimeout()) .withProcessorFactory(processorFactory) .limitConnectionsTo(config.getConnectionLimit()) .limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection()) .thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName)) .protocol(availableProtocolFactories.get(protocolName)) .withSecurityFactory(securityFactoryHolder.niftySecurityFactory) .using(workerExecutor) .taskTimeout(config.getTaskExpirationTimeout()) .withSecurityFactory(config.getSecurityFactory()) .withHeader(config.getHeader()) .withServerHandler(config.getNiftyServerHandler()) .build(); NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder(); nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog()); nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount()); nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount()); nettyServerConfigBuilder.setTimer(timer); NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build(); transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);}複製代碼
構造方法簡單來說主要做了兩件事:
我們再來看下細節點的東西,思考一些問題,如果不感興趣可以跳過。
工作線程池workerExecutor的創建:
workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);複製代碼
默認傳進來的availableWorkerExecutors是空的,所以最終是構建一個新的線程池,
最終調用的方法是makeDefaultWorkerExecutor,下面的代碼稍微簡化了一點。
private ExecutorService makeDefaultWorkerExecutor(){ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); return new ThreadPoolExecutor(getWorkerThreads(), getWorkerThreads(), 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(), new ThreadPoolExecutor.AbortPolicy());}複製代碼
netty的連接線程池和io線程池創建
acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());acceptorThreads = config.getAcceptorThreadCount();ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());ioThreads = config.getIoThreadCount();複製代碼
關於netty配置類NettyServerConfig的構建nettyServerConfigBuilder.build(), 在之前設置了連接線程的線程數和io線程池線程數以及定時器timer,調用build後如下:
public NettyServerConfig build() { Timer timer = getTimer(); ExecutorService bossExecutor = getBossExecutor(); int bossThreadCount = getBossThreadCount(); ExecutorService workerExecutor = getWorkerExecutor(); int workerThreadCount = getWorkerThreadCount(); return new NettyServerConfig( getbootstrapOptions(), timer != null ? timer : new NiftyTimer(threadNamePattern("")), bossExecutor != null ? bossExecutor : buildDefaultBossExecutor(), bossThreadCount, workerExecutor != null ? workerExecutor : buildDefaultWorkerExecutor(), workerThreadCount );}private ExecutorService buildDefaultBossExecutor() { return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-boss-%s")));}private ExecutorService buildDefaultWorkerExecutor() { return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-worker-%s")));}複製代碼
由於沒有設置兩個線程池,所以會設置默認的線程池,注意這裡一個是Boss線程池一個是Worker線程池。 其中的timer在後續構建netty處理器的時候會多次用到。
allChannels是一個netty提供的channelGroup:
private final DefaultChannelGroup allChannels = new DefaultChannelGroup();複製代碼
在介紹後續流程前,先了解下netty服務端創建步驟,因為這裡用的是netty3,和我們比較熟悉的netty4差距有點大,可以對比著看下。
ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), );ServerBootstrap bootstrap = new ServerBootstrap(factory);bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("handler1", new Handler1()); return pipeline; }});Channel channel = bootstrap.bind(new InetSocketAddress(8081));複製代碼
構建NettyServerTransport的時候,在構造方法裡面就是進行netty的一些設置。
transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);複製代碼
在這裡面主要是進行一些對象成員變量的設置,最後也是最重要的就是構建ChannelPipelineFactory,在其中設置各種處理器,大部分繼承自SimpleChannelUpstreamHandler,少部分繼承自ChannelDownstreamHandler,可以類比netty4的ChannelInboundHandler,ChannelOutboundHandler。
this.pipelineFactory = new ChannelPipelineFactory(){ @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); // 設置處理器handler return cp; }}複製代碼
我們來看有哪些處理器。
來一覽處理過程吧
public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels){ this.def = def; this.nettyServerConfig = nettyServerConfig; this.port = def.getServerPort(); this.allChannels = allChannels; final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections()); this.channelStatistics = new ChannelStatistics(allChannels); this.pipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline cp = Channels.pipeline(); TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory(); NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig); cp.addLast("connectionContext", new ConnectionContextHandler()); cp.addLast("connectionLimiter", connectionLimiter); cp.addLast(ChannelStatistics.NAME, channelStatistics); cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(), inputProtocolFactory)); if (def.getClientIdleTimeout() != null) { // Add handlers to detect idle client connections and disconnect them cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(), def.getClientIdleTimeout().toMillis(), NO_WRITER_IDLE_TIMEOUT, NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS)); cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler()); } cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer())); cp.addLast("exceptionLogger", new NiftyExceptionLogger()); return cp; } }; }複製代碼
我們再回到server.start(),看下ThriftServer如何啟動的
ThriftServer: 初始狀態是NOT_STARTED
public synchronized ThriftServer start(){ checkState(state != State.CLOSED, "Thrift server is closed"); if (state == State.NOT_STARTED) { transport.start(serverChannelFactory); state = State.RUNNING; } return this;}複製代碼
NettyServerTransport: 標準的netty服務端創建過程,其中pipelineFactory就是在前面NettyServerTransport構造方法中所創建的。
public void start(ServerChannelFactory serverChannelFactory){ bootstrap = new ServerBootstrap(serverChannelFactory); bootstrap.setOptions(nettyServerConfig.getBootstrapOptions()); bootstrap.setPipelineFactory(pipelineFactory); serverChannel = bootstrap.bind(new InetSocketAddress(port)); // ... }複製代碼
到這裡Thrift的服務端啟動就介紹完了,下一部分將會介紹服務端接受數據,處理數據和響應結果的流程。
作者:可以回家加班嗎
連結:https://juejin.im/post/6879786471901888525
來源:掘金
著作權歸作者所有。商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。