單純netty結合protostuff進行rpc對象傳輸的demo網上有很多,大部分都是一個模子刻出來的,一開始我也是抄了一個,本地測試暢通無阻,未發生任何異常。
部署預發環境,進行壓測後,問題巨多,各種報錯層出不窮。當然,壓測時我用的數據量大、發送請求非常密集,單機是每秒前100ms發送2萬個對象,其他900ms歇息,死循環發送,共計40臺機器作為客戶端,同時往2臺netty Server伺服器發送對象,那麼平均每個server每秒大概要接收40萬個對象,由於後面還有業務邏輯,邏輯每秒只能處理35萬實測。
對於網上的代碼,進行了多次修改,反覆測試,最終是達到了不報錯無異常,單機秒級接收35萬個對象以上,故寫篇文章記錄一下,文中代碼會和線上邏輯保持一致。
這個沒什麼特殊的,網上找個工具類就好了。
引入pom
public class ProtostuffUtils { /** * 避免每次序列化都重新申請Buffer空間 * 這句話在實際生產上沒有意義,耗時減少的極小,但高並發下,如果還用這個buffer,會報異常說buffer還沒清空,就又被使用了 */// private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); /** * 緩存Schema */ private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>(); /** * 序列化方法,把指定對象序列化成字節數組 * * @param obj * @param <T> * @return */ @SuppressWarnings("unchecked") public static <T> byte[] serialize(T obj) { Class<T> clazz = (Class<T>) obj.getClass(); Schema<T> schema = getSchema(clazz); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); byte[] data; try { data = ProtobufIOUtil.toByteArray(obj, schema, buffer);// data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); } finally { buffer.clear(); } return data; } /** * 反序列化方法,將字節數組反序列化成指定Class類型 * * @param data * @param clazz * @param <T> * @return */ public static <T> T deserialize(byte[] data, Class<T> clazz) { Schema<T> schema = getSchema(clazz); T obj = schema.newMessage(); ProtobufIOUtil.mergeFrom(data, obj, schema);// ProtostuffIOUtil.mergeFrom(data, obj, schema); return obj; } @SuppressWarnings("unchecked") private static <T> Schema<T> getSchema(Class<T> clazz) { Schema<T> schema = (Schema<T>) schemaCache.get(clazz); if (Objects.isNull(schema)) { //這個schema通過RuntimeSchema進行懶創建並緩存 //所以可以一直調用RuntimeSchema.getSchema(),這個方法是線程安全的 schema = RuntimeSchema.getSchema(clazz); if (Objects.nonNull(schema)) { schemaCache.put(clazz, schema); } } return schema; }}
此處有坑,就是最上面大部分網上代碼都是用了static的buffer。在單線程情況下沒有問題。在多線程情況下,非常容易出現buffer一次使用後尚未被clear,就再次被另一個線程使用,會拋異常。而所謂的避免每次都申請buffer空間,實測性能影響極其微小。
另裡面兩次ProtostuffIOUtil都改成了ProtobufIOUtil,因為也是出過異常,修改後未見有異常。
解碼器decoder:
import com.jd.platform.hotkey.common.model.HotKeyMsg;import com.jd.platform.hotkey.common.tool.ProtostuffUtils;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * @author wuweifeng * @version 1.0 * @date 2020-07-29 */public class MsgDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) { try { byte[] body = new byte[in.readableBytes()]; //傳輸正常 in.readBytes(body); list.add(ProtostuffUtils.deserialize(body, HotKeyMsg.class)); // if (in.readableBytes() < 4) {// return;// }// in.markReaderIndex();// int dataLength = in.readInt();// if (dataLength < 0) {// channelHandlerContext.close();// }// if (in.readableBytes() < dataLength) {// in.resetReaderIndex();// return;// }//// byte[] data = new byte[dataLength];// in.readBytes(data);//// Object obj = ProtostuffUtils.deserialize(data, HotKeyMsg.class);// list.add(obj); } catch (Exception e) { e.printStackTrace(); } }}
編碼器 encoder
import com.jd.platform.hotkey.common.model.HotKeyMsg;import com.jd.platform.hotkey.common.tool.Constant;import com.jd.platform.hotkey.common.tool.ProtostuffUtils;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder; /** * @author wuweifeng * @version 1.0 * @date 2020-07-30 */public class MsgEncoder extends MessageToByteEncoder { @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) { if (in instanceof HotKeyMsg) { byte[] bytes = ProtostuffUtils.serialize(in); byte[] delimiter = Constant.DELIMITER.getBytes(); byte[] total = new byte[bytes.length + delimiter.length]; System.arraycopy(bytes, 0, total, 0, bytes.length); System.arraycopy(delimiter, 0, total, bytes.length, delimiter.length); out.writeBytes(total); } }}
先看Decoder解碼器,這個是用來netty收到消息後,進行解碼,將字節轉為對象(自定義的HotKeyMsg)用的。裡面有一堆被我注釋掉了,注釋掉的,應該在網上找到的帖子都是那麼寫的。這種方式本身在普通場景下是沒問題的,解碼還算正常,但是當上幾十萬時非常容易出現粘包問題。所以我是在這個解碼器前增加了一個DelimiterBasedFrameDecoder分隔符解碼器。
當收到消息時,先過這個分隔符解碼器,之後到MsgDecoder那裡時,就是已經分隔好的一個對象字節流了,就可以直接用proto工具類進行反序列化的。Constant.DELIMITER是我自定義的一個特殊字符串,用來做分隔符。
再看encoder,編碼器,首先將要傳輸的對象用ProtostuffUtils序列化為byte[],然後在尾巴上掛上我自定義的那個分隔符。這樣在對外發送對象時,就會走這個編碼器,並被加上分隔符。
對應的server端代碼大概是這樣:
之後在Handler裡就可以直接使用這個傳輸的對象了。
再看client端
和Server端是一樣的,也是這幾個編解碼器,沒有區別。因為netty和server之間通訊,我都是用的同一個對象定義。
同理handler也是一樣的。
以上都寫完後,其實就可以測試了,我們可以啟動一個client,一個server,然後搞個死循環往Server發這個對象了,然後你在server端在收到這個對象後,再直接把這個對象也寫回來,原樣發送到客戶端。會發現運行的很順暢,每秒發N萬個沒問題,編解碼都正常,client和server端都比較正常,當前前提是ProtoBuf的工具類和我的一樣,不要共享那個buffer。網上找的文章基本上到這樣也就結束了,隨便發幾個消息沒問題也就算OK。然而實際上,這種代碼上線後,會坑的不要不要的。
其實本地測試也很容易,再啟動幾個客戶端,都連同一個Server,然後給他死循環發對象,再看看兩端會不會有異常。這種情況下,和第一種的區別其實客戶端沒什麼變化,Server端就有變化了,之前同時只給一個client發消息,現在同時給兩個client發消息,這一步如果不謹慎就會出問題了,建議自行嘗試。
之後,我們再加點料,我啟動兩個Server,分別用兩個埠,線上其實是兩臺不同的server伺服器,client會同時往兩臺server死循環發對象,如下圖代碼。
發消息,我們常用的就是channel.writeAndFlush(),大家可以把那個sync去掉,然後跑一下代碼看看。會發現異常拋的一坨一坨的。我們明明是往兩個不同的channel發消息,只不過時間是同時,結果就是發生了嚴重的粘包。server端收到的消息很多都是不規範的,會大量報錯。如果在兩個channel發送間隔100ms,情況就解決了。當然,最終我們可以使用sync同步發送,這樣就不會拋異常了。
以上代碼經測試,40臺client,2臺Server,平均每個server每秒大概接收40萬個對象,可以持續穩定運行。