搭建伺服器處理系統(基於Netty)到底能走多遠?

2020-10-11 smile蒲公英QQ

扯淡:

  現在我算是進入一個能帶著你向前走的團隊,但是產品設計太扯淡,網際網路應用,開發周期異常的短,也高估了開發的能力,趕進度的開發bug很多啊。

  如果開發只是完成任務的行動,是不會感到痛苦的。所以說:想要做好產品的開發,痛苦才剛剛開始。原因就是開發無法左右產品的設計.....

主題:

  時刻關注排行的朋友注意啦,你們都得了排行強迫症啦,趕快找個護士就醫吧。

  一個排行.....(我需要護士)

好吧,據說netty排在第一,那就學習以下吧!

更具公司很久以前的一個伺服器框架代碼,不斷刪減,然後得到一個很簡單的伺服器框架,分享一下。

自己畫的流程圖,流程比較簡單,這方面比較弱,不太會用圖表達:

1,啟用netty

我們需要監聽埠,這樣就可以處理連接上來的tcp消息了。這一步netty用java 的NIO和OIO都封裝了,我們自然選擇NIO了。

一下是啟動伺服器的代碼:對於這個啟動,你只要學習一下netty的手冊例子,就馬上明白了,它手冊的例子也很好,建議大家看看。

public class Start { public static void main(String[] args) { //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); System.out.println("=============show time!============="); initNetty(); } private static final int tcpSendBufferSize = 32768; private static final int tcpReceiveBufferSize = 32768; // 初始化埠的監聽 public static void initNetty(){ InetSocketAddress addr = new InetSocketAddress(8989);//需要監聽的埠,即tcp連接建立的埠 //Executors.newCachedThreadPool()的解釋: //緩衝線程執行器,產生一個大小可變的線程池。 //當線程池的線程多於執行任務所需要的線程的時候,對空閒線程(即60s沒有任務執行)進行回收; //當執行任務的線程數不足的時候,自動拓展線程數量。因此線程數量是JVM可創建線程的最大數目。 ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. // Creates a new group with a generated name. DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup"); Serverbootstrap bootstrap = new ServerBootstrap(channelFactory); // PushServerPipelineFactory作為一個ChannelPipelineFactory產生的工廠類,我們可以把需要執行的Handler進行配置 ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels); // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory. // 伺服器新連接建立的時候,新的ChannelPipeline會通過我們定義的ChannelPipelineFactory產生,其實是調用了getPipeline()方法。 bootstrap.setPipelineFactory(pipelineFactory); if (tcpReceiveBufferSize != -1) { bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize); } if (tcpSendBufferSize != -1) { bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize); } bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.reuseAddress", true); bootstrap.setOption("child.keepAlive", false); bootstrap.setOption("child.tcpNoDelay", true); System.out.println(" ===================netty started====================="); Channel serverChannel = bootstrap.bind(addr); allChannels.add(serverChannel); }

PushServerPipelineFactory 其實就是配置了一下Handler,他叫pushServerCommandHandler,他的作用就是把接收到的信息放進叫receivedQueen的隊列去就好了,其實就是調用了MessageManager的addSocketMessage方法。

我們看一下他的messageReceived方法就明白了,netty是事件機制的,messageReceived是重寫的方法,只要是受到一個連接的消息,就會觸發這個方法。

public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception { CommandMessage command = (CommandMessage) messageEvent.getMessage(); if (command.message.length() > 3) { Channel ch = channelHandlerContext.getChannel(); ch.write("---------message received-------------"); // 向消息隊列裡插消息包,通過handleMessage這個方法, // 插入的MessagePack其實已經更具消息的不同被選擇成不同的子類 // 我覺得這是很關鍵的設計,我們的業務邏輯就可以分成不同的MessagePack子類,然後實現它的onHandler方法 messageManager.addSocketMessage(handleMessage(command.message, messageEvent)); } else { // logger.warn("too short message."); } } //重點方法 public MessagePack handleMessage(String msg, MessageEvent e) { MessagePack messagePack = null; int fid = SjsonUtil.getFIDFromMsg(msg); switch (fid) { case 25: // 調用TestCategoryMsg messagePack = new ShowTimeMessage(msg, e.getChannel()); break; case 26: // 調用不同的業務邏輯 messagePack = new TestCategoryMsg(msg, e.getChannel()); break; default: // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); } return messagePack; }

PushServerPipelineFactory 除了配置好Handler,還把MessageManager啟動起來了,MessageManager是spring的配置文件中配置的。注意他的init-method,就是實例化這個bean的時候會執行它的start方法,這個比較重要,因為MessageManager就是處理消息隊列的模塊,所以他需要在伺服器啟動時啟動線程池去處理消息隊列。MessageManager提供的方法就是用來維護一個叫receivedQueen的隊列。

<bean id="messageManager" class="netty.gate.message.MessageManager" init-method="start"> </bean>

PushServerPipelineFactory

public class Start { public static void main(String[] args) { //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); System.out.println("=============show time!============="); initNetty(); } private static final int tcpSendBufferSize = 32768; private static final int tcpReceiveBufferSize = 32768; // 初始化埠的監聽 public static void initNetty(){ InetSocketAddress addr = new InetSocketAddress(8989);//需要監聽的埠,即tcp連接建立的埠 //Executors.newCachedThreadPool()的解釋: //緩衝線程執行器,產生一個大小可變的線程池。 //當線程池的線程多於執行任務所需要的線程的時候,對空閒線程(即60s沒有任務執行)進行回收; //當執行任務的線程數不足的時候,自動拓展線程數量。因此線程數量是JVM可創建線程的最大數目。 ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. // Creates a new group with a generated name. DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup"); ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); // PushServerPipelineFactory作為一個ChannelPipelineFactory產生的工廠類,我們可以把需要執行的Handler進行配置 ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels); // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory. // 伺服器新連接建立的時候,新的ChannelPipeline會通過我們定義的ChannelPipelineFactory產生,其實是調用了getPipeline()方法。 bootstrap.setPipelineFactory(pipelineFactory); if (tcpReceiveBufferSize != -1) { bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize); } if (tcpSendBufferSize != -1) { bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize); } bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.reuseAddress", true); bootstrap.setOption("child.keepAlive", false); bootstrap.setOption("child.tcpNoDelay", true); System.out.println(" ===================netty started====================="); Channel serverChannel = bootstrap.bind(addr); allChannels.add(serverChannel); }

很關鍵的MessageManager: 維護的是receivedQueen隊列

public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception { CommandMessage command = (CommandMessage) messageEvent.getMessage(); if (command.message.length() > 3) { Channel ch = channelHandlerContext.getChannel(); ch.write("---------message received-------------"); // 向消息隊列裡插消息包,通過handleMessage這個方法, // 插入的MessagePack其實已經更具消息的不同被選擇成不同的子類 // 我覺得這是很關鍵的設計,我們的業務邏輯就可以分成不同的MessagePack子類,然後實現它的onHandler方法 messageManager.addSocketMessage(handleMessage(command.message, messageEvent)); } else { // logger.warn("too short message."); } } //重點方法 public MessagePack handleMessage(String msg, MessageEvent e) { MessagePack messagePack = null; int fid = SjsonUtil.getFIDFromMsg(msg); switch (fid) { case 25: // 調用TestCategoryMsg messagePack = new ShowTimeMessage(msg, e.getChannel()); break; case 26: // 調用不同的業務邏輯 messagePack = new TestCategoryMsg(msg, e.getChannel()); break; default: // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); } return messagePack; }

正真的處理邏輯的代碼是寫在這些繼承MessagePack抽象類裡的,裡面的一個onHandler方法是必須實現的,所以使用了抽象類

下面代碼的onHandler中,就可以寫那些調用service層的,處理資料庫,發郵件,調用接口啊等各種需求操作了。

public class TestCategoryMsg extends MessagePack { private static final String MESSAGE_NAME = "TEST_MESSAGE"; // 消息名稱 public TestCategoryMsg(String msg, Channel channel) { super(msg, channel); } @Override public void onHandler(ApplicationContext ctx) { channel.write("---------------i dont know why--------------"); } public String getName() { return MESSAGE_NAME; }}

到此基本上一個伺服器從接受數據,到回應數據的流程已經走完了。

相關焦點

  • 適合初中級Java程式設計師修煉手冊從0搭建整個Web項目(一)
    Http請求【基於Netty的請求級Web伺服器】 到mvc【接口封裝轉發)】,再到ioc【依賴注入】,aop【切面】,再到 rpc【遠程過程調用】最後到orm【資料庫操作】全部自己擼一個(簡易)的輪子。
  • 手寫SpringBoot:基於Netty搭建一個HTTP S
    實現消息推送系統 :市面上有很多消息推送系統都是基於 Netty 來做的。......Chunked transfer encoding)是超文本傳輸協議(HTTP)中的一種數據傳輸機制(HTTP/1.1 才有),允許 HTTP 由應用伺服器發送給客戶端應用( 通常是網頁瀏覽器)的數據可以分成多「塊」(數據量比較大的情況)。
  • 幾十行代碼基於Netty搭建一個 HTTP Server
    相關項目: (仿 Spring Boot 但不同於 Spring Boot 的一個輕量級的 HTTP 框架)目前正在寫的一個叫做 的輕量級 HTTP 框架內置的 HTTP 伺服器是我自己基於 Netty 寫的,所有的核心代碼加起來不過就幾十行。這得益於 Netty 提供的各種開箱即用的組件,為我們節省了太多事情。
  • RocketMQ篇11:基於Netty的通信實現
    Netty提供了集中擁有相同編程接口的基本傳輸實現:基於NIO的TCP/IP傳輸:io.netty.channel.nio基於OIO的TCP/IP傳輸:io.netty.channel.oio基於OIO的UDP/IP傳輸:io.netty.channel.oio本地傳輸:io.netty.channel.localNetty用法示例1、Discard伺服器
  • Netty實現高性能RPC伺服器優化篇之消息序列化
    有關如何利用Netty開發實現,高性能RPC伺服器的一些設計思路、設計原理,以及具體的實現方案。在文章的最後提及到,其實基於該方案設計的RPC伺服器的處理性能,還有優化的餘地。3、利用第三方編解碼框架(Kryo、Hessian)的時候,考慮到高並發的場景下,頻繁的創建、銷毀序列化對象,會非常消耗JVM的內存資源,影響整個RPC伺服器的處理性能,因此引入對象池化(Object Pooling)技術。眾所周知,創建新對象並初始化,可能會消耗很多的時間。當需要產生大量對象的時候,可能會對性能造成一定的影響。
  • 「源碼學習」基於Netty的大文件斷點續傳開源解決方案
    今天 Gitee 推薦的這款開源項目就是基於 Netty 實現的大文件分塊上傳斷點續傳解決方案,一起來學習吧。項目名稱:fileex項目作者:gjj項目地址:https://gitee.com/gaojunjie03/fileex項目簡介一款基於 netty、http1.1 transfer-encoding
  • 徹底搞懂 Netty 線程模型
    (handler)流程:Read request (接收二進位數據)Decode request (解碼為可讀數據)Process service (對數據進行處理產生結果)Encode reply (將結果編碼為二進位數據)Send reply (返回結果)
  • 該試試Netty了
    Netty目前的項目leader是德國人Norman maurer(之前在Redhat,全職開發Netty),也是《Netty in action》的作者,目前是蘋果公司高級工程師,同時也經常參加netty相關的技術會議,這兩大牛長下面這樣:
  • netty快速入門教程
    什麼是nettyNetty 是一個提供 asynchronous event-driven (異步事件驅動)的網絡應用框架,是一個用以快速開發高性能、高可靠性協議的伺服器和客戶端。換句話說,Netty 是一個 NIO 客戶端伺服器框架,使用它可以快速簡單地開發網絡應用程式,比如伺服器和客戶端的協議。
  • 十天搭建一套前端監控系統(二) 搭建阿里雲伺服器
    ,並不像你在本地運行那麼簡單,只需要一句node index.js名利就可以,你需要一個載體來搭建你的雲計算服務,也就是雲伺服器,才能夠讓監控系統運用在任何地方。對於前端小白來說,整一套完整的伺服器,可能需要經歷許多周折(大牛忽略),不過也不用擔心,且看我一步一步道來。=================================訪問:www.webfunny.cn ;只需簡單幾步,就可以搭建一套屬於自己的前端監控系統。
  • 進阿里、騰訊、字節跳動、美團必掌握的Netty
    Netty的特點:高並發:Netty 是一款基於 NIO(Nonblocking IO,非阻塞IO)開發的網絡通信框架,對比於 BIO(Blocking I/O,阻塞IO),他的並發性能得到了很大提高。傳輸快:Netty 的傳輸依賴於零拷貝特性,儘量減少不必要的內存拷貝,實現了更高效率的傳輸。
  • 華為雲手機能走多遠?能否成為晶片問題的終極解決方案?
    華為雲手機能走多遠?能否成為晶片問題的終極解決方案?相信很多關心華為處境的國人,都想知道這個問題的答案。在華為雲手機的官方網站上,華為是這樣定義「雲手機」的:雲手機(Cloud Phone,簡稱CPH),是基於華為雲裸金屬伺服器虛擬出的帶有原生安卓作業系統,同時具有虛擬手機功能的雲伺服器。
  • 雲伺服器能搭建網站嗎
    雲伺服器能搭建網站嗎?雲伺服器能搭建網站的,從中小型網站到大型網站,雲伺服器均能搭建。雲伺服器可以搭建不同的環境,來滿足不同網站需求。雲伺服器搭建網站有哪些優勢?雲伺服器,擁有獨立IP、獨立空間;伺服器配置自己選擇,滿足各類需求;雲計算架構,保障伺服器性能與安全等。雲伺服器搭建網站步驟是怎樣的?首先到西部數碼的雲伺服器產品頁面,選擇好各項配置(vcpu、內存、帶寬、作業系統等),支付費用,即可購買好一臺雲伺服器。
  • 如何搭建屬於自己的監控系統之阿里雲伺服器搭建篇
    (本地部署類似)  對於之前沒有搞過伺服器的前端來說,折騰一個能運行的伺服器,還真得費了不少周折。  伺服器類型: 入門級(共享) 2vCPU 4GB內存 帶寬2M(個人建議2vCpu 2GB內存)  安裝運行環境:1. 系統: Ubuntu 16 64位 硬碟40G (默認)2.
  • 如何基於NLP(自然語言處理)技術構建應用系統?
    自然語言處理(NLP)目前在技術上相對已經比較成熟了,同時在我們的日常生活中,也有很多的應用,通過一些基礎的詞幹提取、詞性標註、句法分析,可以簡單的實現文本糾錯、文章標籤、文章分類等基礎能力外,還能夠實現一些高級應用。
  • 八、Netty入門服務端代碼
    如果能深入理解以上兩篇文章,那麼後續對於Netty的學習和理解Netty的Reactor會非常簡單。今天我們先通過入門的角度從Idea中導入Netty包,並且使用Netty構建一個可用於接受數據的服務端。
  • 雲伺服器能搭建虛擬機嗎
    雲伺服器能搭建虛擬機嗎?理論上來講,雲伺服器上是可以搭建虛擬機的。但因為雲伺服器本身已經虛擬化一次了,現在的主流雲服務商為了保證自身產品的安全穩定性,基本會在自己的雲伺服器上限制進行二次搭建虛擬機。比如,購買一臺 windows 雲伺服器,然後在裡面安裝 VMware 虛擬化軟體。
  • Netty從入門到禿頭:websocket
    --netty的依賴集合,都整合在一個依賴裡面了--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version>
  • 基於Docker搭建網際網路伺服器
    基於Docker搭建網際網路伺服器背景利用Docker技術分別搭建起來一個HTTP伺服器和一個FTP伺服器, 同一區域網內(例如機房)其他計算機通過瀏覽器或客戶端可以訪問這臺伺服器, 可以下載各種格式的文件
  • 史上最通俗Netty入門長文:基本介紹、環境搭建、動手實戰
    2)NIO:一個請求一個線程,客戶端發送的連接請求會註冊到多路復用器上,多路復用器輪詢到該連接有I/O請求時才啟動一個線程進行處理;3)AIO:一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知伺服器應用去啟動線程進行處理。