扯淡:
現在我算是進入一個能帶著你向前走的團隊,但是產品設計太扯淡,網際網路應用,開發周期異常的短,也高估了開發的能力,趕進度的開發bug很多啊。
如果開發只是完成任務的行動,是不會感到痛苦的。所以說:想要做好產品的開發,痛苦才剛剛開始。原因就是開發無法左右產品的設計.....
主題:
時刻關注排行的朋友注意啦,你們都得了排行強迫症啦,趕快找個護士就醫吧。
一個排行.....(我需要護士)
好吧,據說netty排在第一,那就學習以下吧!
更具公司很久以前的一個伺服器框架代碼,不斷刪減,然後得到一個很簡單的伺服器框架,分享一下。
自己畫的流程圖,流程比較簡單,這方面比較弱,不太會用圖表達:
1,啟用netty
我們需要監聽埠,這樣就可以處理連接上來的tcp消息了。這一步netty用java 的NIO和OIO都封裝了,我們自然選擇NIO了。
一下是啟動伺服器的代碼:對於這個啟動,你只要學習一下netty的手冊例子,就馬上明白了,它手冊的例子也很好,建議大家看看。
public class Start { public static void main(String[] args) { //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); System.out.println("=============show time!============="); initNetty(); } private static final int tcpSendBufferSize = 32768; private static final int tcpReceiveBufferSize = 32768; // 初始化埠的監聽 public static void initNetty(){ InetSocketAddress addr = new InetSocketAddress(8989);//需要監聽的埠,即tcp連接建立的埠 //Executors.newCachedThreadPool()的解釋: //緩衝線程執行器,產生一個大小可變的線程池。 //當線程池的線程多於執行任務所需要的線程的時候,對空閒線程(即60s沒有任務執行)進行回收; //當執行任務的線程數不足的時候,自動拓展線程數量。因此線程數量是JVM可創建線程的最大數目。 ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. // Creates a new group with a generated name. DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup"); Serverbootstrap bootstrap = new ServerBootstrap(channelFactory); // PushServerPipelineFactory作為一個ChannelPipelineFactory產生的工廠類,我們可以把需要執行的Handler進行配置 ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels); // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory. // 伺服器新連接建立的時候,新的ChannelPipeline會通過我們定義的ChannelPipelineFactory產生,其實是調用了getPipeline()方法。 bootstrap.setPipelineFactory(pipelineFactory); if (tcpReceiveBufferSize != -1) { bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize); } if (tcpSendBufferSize != -1) { bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize); } bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.reuseAddress", true); bootstrap.setOption("child.keepAlive", false); bootstrap.setOption("child.tcpNoDelay", true); System.out.println(" ===================netty started====================="); Channel serverChannel = bootstrap.bind(addr); allChannels.add(serverChannel); }
PushServerPipelineFactory 其實就是配置了一下Handler,他叫pushServerCommandHandler,他的作用就是把接收到的信息放進叫receivedQueen的隊列去就好了,其實就是調用了MessageManager的addSocketMessage方法。
我們看一下他的messageReceived方法就明白了,netty是事件機制的,messageReceived是重寫的方法,只要是受到一個連接的消息,就會觸發這個方法。
public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception { CommandMessage command = (CommandMessage) messageEvent.getMessage(); if (command.message.length() > 3) { Channel ch = channelHandlerContext.getChannel(); ch.write("---------message received-------------"); // 向消息隊列裡插消息包,通過handleMessage這個方法, // 插入的MessagePack其實已經更具消息的不同被選擇成不同的子類 // 我覺得這是很關鍵的設計,我們的業務邏輯就可以分成不同的MessagePack子類,然後實現它的onHandler方法 messageManager.addSocketMessage(handleMessage(command.message, messageEvent)); } else { // logger.warn("too short message."); } } //重點方法 public MessagePack handleMessage(String msg, MessageEvent e) { MessagePack messagePack = null; int fid = SjsonUtil.getFIDFromMsg(msg); switch (fid) { case 25: // 調用TestCategoryMsg messagePack = new ShowTimeMessage(msg, e.getChannel()); break; case 26: // 調用不同的業務邏輯 messagePack = new TestCategoryMsg(msg, e.getChannel()); break; default: // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); } return messagePack; }
PushServerPipelineFactory 除了配置好Handler,還把MessageManager啟動起來了,MessageManager是spring的配置文件中配置的。注意他的init-method,就是實例化這個bean的時候會執行它的start方法,這個比較重要,因為MessageManager就是處理消息隊列的模塊,所以他需要在伺服器啟動時啟動線程池去處理消息隊列。MessageManager提供的方法就是用來維護一個叫receivedQueen的隊列。
<bean id="messageManager" class="netty.gate.message.MessageManager" init-method="start"> </bean>
PushServerPipelineFactory:
public class Start { public static void main(String[] args) { //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); System.out.println("=============show time!============="); initNetty(); } private static final int tcpSendBufferSize = 32768; private static final int tcpReceiveBufferSize = 32768; // 初始化埠的監聽 public static void initNetty(){ InetSocketAddress addr = new InetSocketAddress(8989);//需要監聽的埠,即tcp連接建立的埠 //Executors.newCachedThreadPool()的解釋: //緩衝線程執行器,產生一個大小可變的線程池。 //當線程池的線程多於執行任務所需要的線程的時候,對空閒線程(即60s沒有任務執行)進行回收; //當執行任務的線程數不足的時候,自動拓展線程數量。因此線程數量是JVM可創建線程的最大數目。 ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. // Creates a new group with a generated name. DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup"); ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); // PushServerPipelineFactory作為一個ChannelPipelineFactory產生的工廠類,我們可以把需要執行的Handler進行配置 ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels); // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory. // 伺服器新連接建立的時候,新的ChannelPipeline會通過我們定義的ChannelPipelineFactory產生,其實是調用了getPipeline()方法。 bootstrap.setPipelineFactory(pipelineFactory); if (tcpReceiveBufferSize != -1) { bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize); } if (tcpSendBufferSize != -1) { bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize); } bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.reuseAddress", true); bootstrap.setOption("child.keepAlive", false); bootstrap.setOption("child.tcpNoDelay", true); System.out.println(" ===================netty started====================="); Channel serverChannel = bootstrap.bind(addr); allChannels.add(serverChannel); }
很關鍵的MessageManager: 維護的是receivedQueen隊列
public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception { CommandMessage command = (CommandMessage) messageEvent.getMessage(); if (command.message.length() > 3) { Channel ch = channelHandlerContext.getChannel(); ch.write("---------message received-------------"); // 向消息隊列裡插消息包,通過handleMessage這個方法, // 插入的MessagePack其實已經更具消息的不同被選擇成不同的子類 // 我覺得這是很關鍵的設計,我們的業務邏輯就可以分成不同的MessagePack子類,然後實現它的onHandler方法 messageManager.addSocketMessage(handleMessage(command.message, messageEvent)); } else { // logger.warn("too short message."); } } //重點方法 public MessagePack handleMessage(String msg, MessageEvent e) { MessagePack messagePack = null; int fid = SjsonUtil.getFIDFromMsg(msg); switch (fid) { case 25: // 調用TestCategoryMsg messagePack = new ShowTimeMessage(msg, e.getChannel()); break; case 26: // 調用不同的業務邏輯 messagePack = new TestCategoryMsg(msg, e.getChannel()); break; default: // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); } return messagePack; }
正真的處理邏輯的代碼是寫在這些繼承MessagePack的抽象類裡的,裡面的一個onHandler方法是必須實現的,所以使用了抽象類。
下面代碼的onHandler中,就可以寫那些調用service層的,處理資料庫,發郵件,調用接口啊等各種需求操作了。
public class TestCategoryMsg extends MessagePack { private static final String MESSAGE_NAME = "TEST_MESSAGE"; // 消息名稱 public TestCategoryMsg(String msg, Channel channel) { super(msg, channel); } @Override public void onHandler(ApplicationContext ctx) { channel.write("---------------i dont know why--------------"); } public String getName() { return MESSAGE_NAME; }}
到此基本上一個伺服器從接受數據,到回應數據的流程已經走完了。