使用Netty如何做到單機秒級接收35萬個對象

2020-10-10 瀟灑的程式設計師

單純netty結合protostuff進行rpc對象傳輸的demo網上有很多,大部分都是一個模子刻出來的,一開始我也是抄了一個,本地測試暢通無阻,未發生任何異常。

部署預發環境,進行壓測後,問題巨多,各種報錯層出不窮。當然,壓測時我用的數據量大、發送請求非常密集,單機是每秒前100ms發送2萬個對象,其他900ms歇息,死循環發送,共計40臺機器作為客戶端,同時往2臺netty Server伺服器發送對象,那麼平均每個server每秒大概要接收40萬個對象,由於後面還有業務邏輯,邏輯每秒只能處理35萬實測。

對於網上的代碼,進行了多次修改,反覆測試,最終是達到了不報錯無異常,單機秒級接收35萬個對象以上,故寫篇文章記錄一下,文中代碼會和線上邏輯保持一致。

Protostuff序列化和反序列化

這個沒什麼特殊的,網上找個工具類就好了。

引入pom

public class ProtostuffUtils {    /**     * 避免每次序列化都重新申請Buffer空間     * 這句話在實際生產上沒有意義,耗時減少的極小,但高並發下,如果還用這個buffer,會報異常說buffer還沒清空,就又被使用了     *///    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);    /**     * 緩存Schema     */    private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();     /**     * 序列化方法,把指定對象序列化成字節數組     *     * @param obj     * @param <T>     * @return     */    @SuppressWarnings("unchecked")    public static <T> byte[] serialize(T obj) {        Class<T> clazz = (Class<T>) obj.getClass();        Schema<T> schema = getSchema(clazz);        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);        byte[] data;        try {            data = ProtobufIOUtil.toByteArray(obj, schema, buffer);//            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);        } finally {            buffer.clear();        }         return data;    }     /**     * 反序列化方法,將字節數組反序列化成指定Class類型     *     * @param data     * @param clazz     * @param <T>     * @return     */    public static <T> T deserialize(byte[] data, Class<T> clazz) {        Schema<T> schema = getSchema(clazz);        T obj = schema.newMessage();        ProtobufIOUtil.mergeFrom(data, obj, schema);//        ProtostuffIOUtil.mergeFrom(data, obj, schema);        return obj;    }     @SuppressWarnings("unchecked")    private static <T> Schema<T> getSchema(Class<T> clazz) {        Schema<T> schema = (Schema<T>) schemaCache.get(clazz);        if (Objects.isNull(schema)) {            //這個schema通過RuntimeSchema進行懶創建並緩存            //所以可以一直調用RuntimeSchema.getSchema(),這個方法是線程安全的            schema = RuntimeSchema.getSchema(clazz);            if (Objects.nonNull(schema)) {                schemaCache.put(clazz, schema);            }        }         return schema;    }}

此處有坑,就是最上面大部分網上代碼都是用了static的buffer。在單線程情況下沒有問題。在多線程情況下,非常容易出現buffer一次使用後尚未被clear,就再次被另一個線程使用,會拋異常。而所謂的避免每次都申請buffer空間,實測性能影響極其微小。

另裡面兩次ProtostuffIOUtil都改成了ProtobufIOUtil,因為也是出過異常,修改後未見有異常。

自定義序列化方式

解碼器decoder:

import com.jd.platform.hotkey.common.model.HotKeyMsg;import com.jd.platform.hotkey.common.tool.ProtostuffUtils;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * @author wuweifeng * @version 1.0 * @date 2020-07-29 */public class MsgDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) {        try {             byte[] body = new byte[in.readableBytes()];  //傳輸正常            in.readBytes(body);             list.add(ProtostuffUtils.deserialize(body, HotKeyMsg.class)); //            if (in.readableBytes() < 4) {//                return;//            }//            in.markReaderIndex();//            int dataLength = in.readInt();//            if (dataLength < 0) {//                channelHandlerContext.close();//            }//            if (in.readableBytes() < dataLength) {//                in.resetReaderIndex();//                return;//            }////            byte[] data = new byte[dataLength];//            in.readBytes(data);////            Object obj = ProtostuffUtils.deserialize(data, HotKeyMsg.class);//            list.add(obj);        } catch (Exception e) {            e.printStackTrace();        }    }}

編碼器 encoder

import com.jd.platform.hotkey.common.model.HotKeyMsg;import com.jd.platform.hotkey.common.tool.Constant;import com.jd.platform.hotkey.common.tool.ProtostuffUtils;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder; /** * @author wuweifeng * @version 1.0 * @date 2020-07-30 */public class MsgEncoder extends MessageToByteEncoder {     @Override    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {        if (in instanceof HotKeyMsg) {            byte[] bytes = ProtostuffUtils.serialize(in);            byte[] delimiter = Constant.DELIMITER.getBytes();             byte[] total = new byte[bytes.length + delimiter.length];            System.arraycopy(bytes, 0, total, 0, bytes.length);            System.arraycopy(delimiter, 0, total, bytes.length, delimiter.length);             out.writeBytes(total);        }    }}

先看Decoder解碼器,這個是用來netty收到消息後,進行解碼,將字節轉為對象(自定義的HotKeyMsg)用的。裡面有一堆被我注釋掉了,注釋掉的,應該在網上找到的帖子都是那麼寫的。這種方式本身在普通場景下是沒問題的,解碼還算正常,但是當上幾十萬時非常容易出現粘包問題。所以我是在這個解碼器前增加了一個DelimiterBasedFrameDecoder分隔符解碼器。

當收到消息時,先過這個分隔符解碼器,之後到MsgDecoder那裡時,就是已經分隔好的一個對象字節流了,就可以直接用proto工具類進行反序列化的。Constant.DELIMITER是我自定義的一個特殊字符串,用來做分隔符。

再看encoder,編碼器,首先將要傳輸的對象用ProtostuffUtils序列化為byte[],然後在尾巴上掛上我自定義的那個分隔符。這樣在對外發送對象時,就會走這個編碼器,並被加上分隔符。

對應的server端代碼大概是這樣:

之後在Handler裡就可以直接使用這個傳輸的對象了。

再看client端

和Server端是一樣的,也是這幾個編解碼器,沒有區別。因為netty和server之間通訊,我都是用的同一個對象定義。

同理handler也是一樣的。

單機和集群

以上都寫完後,其實就可以測試了,我們可以啟動一個client,一個server,然後搞個死循環往Server發這個對象了,然後你在server端在收到這個對象後,再直接把這個對象也寫回來,原樣發送到客戶端。會發現運行的很順暢,每秒發N萬個沒問題,編解碼都正常,client和server端都比較正常,當前前提是ProtoBuf的工具類和我的一樣,不要共享那個buffer。網上找的文章基本上到這樣也就結束了,隨便發幾個消息沒問題也就算OK。然而實際上,這種代碼上線後,會坑的不要不要的。

其實本地測試也很容易,再啟動幾個客戶端,都連同一個Server,然後給他死循環發對象,再看看兩端會不會有異常。這種情況下,和第一種的區別其實客戶端沒什麼變化,Server端就有變化了,之前同時只給一個client發消息,現在同時給兩個client發消息,這一步如果不謹慎就會出問題了,建議自行嘗試。

之後,我們再加點料,我啟動兩個Server,分別用兩個埠,線上其實是兩臺不同的server伺服器,client會同時往兩臺server死循環發對象,如下圖代碼。

發消息,我們常用的就是channel.writeAndFlush(),大家可以把那個sync去掉,然後跑一下代碼看看。會發現異常拋的一坨一坨的。我們明明是往兩個不同的channel發消息,只不過時間是同時,結果就是發生了嚴重的粘包。server端收到的消息很多都是不規範的,會大量報錯。如果在兩個channel發送間隔100ms,情況就解決了。當然,最終我們可以使用sync同步發送,這樣就不會拋異常了。

以上代碼經測試,40臺client,2臺Server,平均每個server每秒大概接收40萬個對象,可以持續穩定運行。

相關焦點

  • 實戰 | Netty實現單機壓測秒級接收35萬個對象
    當然,壓測時我用的數據量大、發送請求非常密集,單機是每秒前100ms發送2萬個對象,其他900ms歇息,死循環發送,共計40臺機器作為客戶端,同時往2臺netty Server伺服器發送對象,那麼平均每個server每秒大概要接收40萬個對象,由於後面還有業務邏輯,邏輯每秒只能處理35萬實測。
  • 實戰|Netty+Protostuff實現單機壓測秒級接收35萬個對象
    當然,壓測時我用的數據量大、發送請求非常密集,單機是每秒前100ms發送2萬個對象,其他900ms歇息,死循環發送,共計40臺機器作為客戶端,同時往2臺netty Server伺服器發送對象,那麼平均每個server每秒大概要接收40萬個對象,由於後面還有業務邏輯,邏輯每秒只能處理35萬實測。
  • 實戰 | Netty+Protostuff實現單機壓測秒級接收35萬個對象
    當然,壓測時我用的數據量大、發送請求非常密集,單機是每秒前100ms發送2萬個對象,其他900ms歇息,死循環發送,共計40臺機器作為客戶端,同時往2臺netty Server伺服器發送對象,那麼平均每個server每秒大概要接收40萬個對象,由於後面還有業務邏輯,邏輯每秒只能處理35萬實測。
  • Netty結合Protostuff傳輸35萬個對象案例
    當然,壓測時我用的數據量大、發送請求非常密集,單機是每秒前100ms發送2萬個對象,其他900ms歇息,死循環發送,共計40臺機器作為客戶端,同時往2臺netty Server伺服器發送對象,那麼平均每個server每秒大概要接收40萬個對象,由於後面還有業務邏輯,邏輯每秒只能處理35萬實測。
  • 京東開源熱key探測中間件單機qps,2萬提升至35萬實錄
    40多萬個待測key,並計算完畢其中的35萬左右。也就是說,16核機器,目前是32個netty IO線程,32個消費者業務線程。再次上線,cpu日常佔用率降低到7%-10%左右。 二次壓測開始,從下圖每10秒列印一次的日誌來看,單機秒級計算完畢的key在8萬多,同時秒級推送量在10萬左右。
  • 京東開源熱key探測中間件單機qps,2萬提升至35萬實錄
    N臺伺服器發來的40多萬個待測key,並計算完畢其中的35萬左右。也就是說,16核機器,目前是32個netty IO線程,32個消費者業務線程。再次上線,cpu日常佔用率降低到7%-10%左右。二次壓測開始,從下圖每10秒列印一次的日誌來看,單機秒級計算完畢的key在8萬多,同時秒級推送量在10萬左右。
  • 京東熱key探測中間件單機qps 2萬升至35萬實錄,服了
    N臺伺服器發來的40多萬個待測key,並計算完畢其中的35萬左右。也就是說,16核機器,目前是32個netty IO線程,32個消費者業務線程。再次上線,cpu日常佔用率降低到7%-10%左右。二次壓測開始,從下圖每10秒列印一次的日誌來看,單機秒級計算完畢的key在8萬多,同時秒級推送量在10萬左右。
  • Netty實戰:如何讓單機下Netty支持百萬長連接?
    這表明這臺Linux系統最多允許同時打開(即包含所有用戶打開文件數總和)98566個文件,是Linux系統級硬限制,所有用戶級的打開文件數限制都不應超過這個數值。在百萬級的情況下,需要為每個接入的端側設備至少分配一個接收和發送緩衝區對象,採用傳統的非池模式,每次消息讀寫都需要創建和釋放 ByteBuf對象,如果有100萬個連接,每秒上報一次數據或者心跳,就會有100
  • netty快速入門教程
    開始之前在開始之前我們先說明下開發環境,我們使用netty-4.1.30這個版本,jdk使用1.8及以上版本。在這個例子中我們實現了一個服務端的應用,因此會有2個 會被NioEventLoopGroup使用。第一個經常被叫做『boss』,用來接收進來的連接。第二個經常被叫做『worker』,用來處理已經被接收的連接, 一旦『boss』接收到連接,就會把連接信息註冊到『worker』上。
  • 如何在SpringBoot中,使用Netty實現遠程調用?
    一個SelectionKey鍵表示了一個特定的通道對象和一個特定的選擇器對象之間的註冊關係。上圖,簡單地可以描述為"boss接活,讓work幹":manReactor用來接收請求--netty--><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version></dependency>
  • Dubbo源碼學習——從源碼看看dubbo對netty的使用
    前言前段時間,從頭開始將netty源碼了解了個大概,但都是原理上理解。剛好博主對dubbo框架了解過一些,這次就以dubbo框架為例,詳細看看dubbo這種出色的開源框架是如何使用netty的,又是如何與框架本身邏輯進行融合的。
  • Netty的使用:Client端
    ;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup
  • 窺探 Netty 源碼!Recycler 對象池實現原理剖析
    Recycler 在 netty 中被如何使用Recycler 對象池在 netty 中最重要的使用,就在於 netty 的池化 ByteBuf 的場景下。首先,何為池化?以 PooledDirectByteBuf 舉例,每一個 PooledDirectByteBuf 在應用線程中使用完畢之後,並不會被釋放,而是等待被重新利用,類比線程池每個線程在執行完畢之後不會被立即釋放,而是等待下一次執行的時候被重新利用。所謂的對象池也是如此,池化減少了 ByteBuf 創建和銷毀的開銷,也是 netty 高性能表現的基石之一。
  • 京東開源,秒殺,限流,中間件,單機QPS 提升至 37 萬
    HotKey在618穩定版0.2版基礎上,引入了protobuf序列化方式,並優化了傳輸對象。worker單機性能從618大促穩定版的20萬QPS穩定,30萬極限,提升至30萬穩定,37萬極限。且cpu峰值下降了15%。
  • 徹底搞懂 Netty 線程模型
    這樣即使幾十萬個請求同時到來也無所謂了。Netty 之所以單機支持百萬級別並發量,就是因為一主多從的線程模型。網絡連接的配置參數 (例如接收緩衝區大小)提供異步的網絡 I/O 操作(如建立連接,讀寫,綁定埠),異步調用意味著任何 I/O 調用都將立即返回,並且不保證在調用結束時所請求的 I/O 操作已完成。
  • RocketMQ篇11:基於Netty的通信實現
    重新實現的ByteBuf特性包括允許使用自定義的緩存類型、透明的零拷貝實現、比ByteBuffer更快的相應速度。字節緩存在網絡通信中會被頻繁地使用,ByteBuf實現的是一個非常輕量級的字節數組包裝器。ByteBuf有讀操作和寫操作,為了方便用戶使用,該緩衝區維護了讀索引和寫索引。ByteBuf由三個片段組成:廢棄段、可讀段和可寫段。
  • MqttWk 1.0.0-netty 發布,Java MQTT服務及消息代理
    MqttWk by netty基於 nutzboot + netty + redis + kafka 實現的MQTT服務broker(另有t-io版本分支,暫未發布)本項目代碼主要來源於
  • 蘇寧超6億會員如何做到秒級用戶畫像查詢?
    【51CTO.com原創稿件】想做營銷活動,如何找到目標人群及用戶特徵?人群的篩選通常離不開用戶畫像。隨著數據的日益增多,如何對 6 億+用戶千億級別的標籤數據進行秒級用戶畫像?本文將帶來用戶畫像技術的新發展和架構實踐,介紹基於 ClickHouse 定製開發的標籤平臺,真正做到海量標籤數據的快速導入和秒級用戶畫像查詢分析,提供一整套從前期人群篩選到後期的營銷策略優化的標籤體系。
  • 京東熱 key 探測框架新版發布,單機 QPS 可達 35 萬
    有時候這種宕機發生後,其他功能都是可以使用的,只是和這個熱點有關的內容會無法訪問,這其實就和熱點數據有關係了。一般情況下,我們會把熱點數據緩存下來,而緩存一般都需要有個固定的key,所以,很多時候我們也稱這類問題為熱key問題。
  • 深入理解Netty編解碼、粘包拆包、心跳機制
    Netty粘包拆包TCP 粘包拆包是指發送方發送的若干包數據到接收方接收時粘成一包或某個數據包被拆開接收。收發兩端(客戶端和伺服器端)都要有成對的 socket,因此,發送端為了將多個發送給接收端的包,更有效的發送給對方,使用了優化方法(Nagle算法),將多次間隔較少且數據量小的數據,合併成一個大的數據塊,然後進行封包,這樣做雖然提供了效率,但是接收端就難以分辨出完整的數據包了,因為面向流的通信是無消息保護邊界的。