這個通訊庫很簡潔、功能也比較弱,簡潔是因為沒有疊加太多東西進去,就是把golang的net庫包裝了一下,使得使用上更簡單,本質上是在golang的net庫增加了一些功能,倉庫地址:github.com/apache/dubbo-getty
先說一下客戶端是如何實現的,然後再說Server端如何實現,在分析對方的實現之前,先想一下,假設我們自己來實現這樣一個所謂的tpc通信庫,該怎麼搞呢?試想如下:
通信是雙方的事情,所以抽象出一個Endpoint接口
支持設置讀寫超時時間
支持收發數據時進行壓縮,消耗一些cpu,換來數據流量的減少
統計收發的報文數量,收發的字節數量
要支持序列化和反序列化,實現業務對象和字節數組的互相轉換,畢竟通過socket發送的是字節流
要支持字節流的編解碼,否則沒有邊界的tcp字節流,沒法區分數據包,這一點在netty中實現的很好,但是getty沒有實現,這需要應用開發人員自己搞定,而在dobbo-java版本中也是由框架搞定的,我們不需要操心,所以用dubbo-getty的話,就需要自己搞定協議的編解碼
支持監控tcp連接的活躍,可以關閉不活躍的tcp連接,這就要求記住任何一個tcp連接最後一次發送數據的時間戳
可能有人對序列化/反序列化、通信協議的編解碼這2個概念會有混淆,核心是因為一些rpc框架提供的API接口,是基於業務對象的send方法,方法直接接受的是業務對象,框架會自己搞定序列化和編解碼,所以如果只是調用別人寫好的框架是感受不到細節的,這裡說一下區別。
各種程式語言中把業務對象序列化為字節數組並不是編解碼,因為tcp是面向流的協議,N多次rpc調用產生的字節數組會混在一起奔向網絡通信的另一端,那如何從奔流不息的字節流中準確區分出一個又一個報文,這就是編解碼做的事情,貼一張圖例來說明編解碼的作用(要看到messageBody才是序列化後的字節對象,而真正通過socket發出去的字節流裡面還有一些Header字節呢):
getty抽象類一個EndPoint接口,裡面還用到了一個任務池(這個後面再說,本質就是協程池,和線程池差不多理解)gxsync.GenericTaskPool,核心其實是它定義的RunEventLoop方法,這裡先大概說一下,tcp的雙方收發數據,因為tcp是無限流,所以收數據必然要在一個無限循環(java裡面是線程,golang裡面是協程)中永久讀取數據的,所以這裡才取名叫Loop,有循環的意思。
// EndPoint represents the identity of the client/servertype EndPoint interface { // ID get EndPoint ID ID() EndPointID // EndPointType get endpoint type EndPointType() EndPointType // RunEventLoop run event loop and serves client request. RunEventLoop(newSession NewSessionCallback) // IsClosed check the endpoint has been closed IsClosed() bool // Close close the endpoint and free its resource Close() // GetTaskPool get task pool implemented by dubbogo/gost GetTaskPool() gxsync.GenericTaskPool}這個接口有2個實現,一看就明白,如下所示,也是核心實現,這2個都是我們分析的重點實現。
getty並不是一個RPC框架,不提供Request-Response同步模型,客戶端發送出去後,服務端的響應是異步的,需要自己在此基礎上自行實現RPC的Request-Response同步編程模型。
getty的Client接口上並不像通常的api調用,既然api的名字都取成Client了,那麼在Client接口上理所當然應該提供一些Send之類的方法,可是這裡並不是這樣設計的,Client接口內嵌了EndPoint接口,2個接口裡面都沒有所謂的Send方法!
type Client interface { EndPoint}type Server interface { EndPoint}這個Client只不過是個客戶端編程入口類而已,真正的數據收發並不是在Client接口上提供的,Client真正的核心方法是RunEventLoop,同時接受一個NewSessionCallback 回調函數,在這個函數裡面會傳入一個Session對象,這個Session才是表示Client和Server之間的一個通信通道,我們往下看看Session接口
golang並不是純粹的OOP程式語言,所以混合著函數式編程,可以看到這個NewSessionCallback就是一個函數罷了,而傳入函數的參數Session也是我們要分析的重點之一。
type NewSessionCallback func(Session) error
// RunEventLoop run event loop and serves client request.RunEventLoop(newSession NewSessionCallback)session.go才是真正的交互接口,看它的WritePkg方法才是真命天子!一看就知道,傳入一個對像,會把它序列化成字節數組,然後通過底層的net.TcpConn發出去
WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)WriteBytes([]byte) (int, error)WriteBytesArray(...[]byte) (int, error)所以Session接口的實現類必然會包含TCPConn,這個Connection接口是getty定義的一個socket連接
type session struct { name string endPoint EndPoint //這個是用於發送數據的socket通道 Connection}
// 表示一條連接通信雙方的通道,可以看得出來,提供了一些業務層面的功能,//比如設置讀寫超時時間,記錄收發的報文數量,設置壓縮類型,這些都是//最底層的golang的net庫並不會提供的功能,這就需要業務層來自己實現type Connection interface { ID() uint32 SetCompressType(CompressType) LocalAddr() string RemoteAddr() string incReadPkgNum() incWritePkgNum() // UpdateActive update session's active time UpdateActive() // GetActive get session's active time GetActive() time.Time readTimeout() time.Duration // SetReadTimeout sets deadline for the future read calls. SetReadTimeout(time.Duration) writeTimeout() time.Duration // SetWriteTimeout sets deadline for the future read calls. SetWriteTimeout(time.Duration) send(interface{}) (int, error) close(int) // set related session setSession(Session)}
//實現了Connection 接口,提供的是基於tcp的連接type gettyTCPConn struct { gettyConn reader io.Reader writer io.Writer //這就是底層的網絡socket連接,golang的net.Conn是接口, //這裡其實是net.TCPConn結構體 conn net.Conn}
// TCPConn is an implementation of the Conn interface for TCP network connections.type TCPConn struct { conn}根據上面對幾個結構體的闡述,客戶端的Client真正的核心方法是RunEventLoop,一定會在RunEventLoop方法中做幾個事情:
1、建立和服務端的tcp連接,也就是會調用net.Dial方法
2、構造Connection對象
3、構造Session對象
4、回調type NewSessionCallback func(Session) error,所以說golang是一個函數式程式語言,它是可以把函數傳來傳去的,和C語言的函數指針差不多
5、一旦RunEventLoop方法執行完成,一定要找個什麼地方持有對Session接口的引用,否則客戶端代碼就沒法通過Session.WritePkg方法發送數據包了,可把這個Session接口的引用作為全局變量或者某個自己業務類的局部變量
之所以說getty不是一個RPC框架,就看Session.WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error),你看它返回的是待發送的字節數、實際發送的字節數,這個完全是從網絡通信的客戶端的角度出發的,並沒有返回服務端的Response數據。
一個RPC的客戶端API設計一定會設計Send、Write之類的方法,都會返回Response對象,這樣程序代碼就可以繼續往下走,根據返回的Response中的數據進行業務代碼的編寫,而很明顯,getty並沒有提供,當客戶端調用WritePkg後,是沒法得到服務端的返回數據的,那怎麼辦,這豈不是沒法搞了,繼續往下走來分析看怎麼弄的,核心是因為getty的定位不一樣。
先來說一下客戶端使用方法,從而了解它的設計思路
1. client := getty.NewTCPClient得到一個 getty.Client 對象
client := getty.NewTCPClient( getty.WithServerAddress(*ip+":8090"), getty.WithConnectionNumber(*connections))2. 調用client.RunEventLoop(newSession NewSessionCallback) 開啟事件循環,Run裡面會真正和服務端建立tcp連接,NewSessionCallback是一個函數,原型是:type NewSessionCallback func(Session) error,RunEventLoop是EndPoint接口上的方法,client和server端都分別進行了實現,從Endpoint的取名上看得出來,一個通信的雙方,必然是由2個端點組成的,2個端點各自維護自己的業務邏輯:
對於client來說,RunEventLoop就是真正和服務端建立tcp連接的地方,那為什麼名稱要取名為RunEventLoop呢,好像是在跑一個什麼事件循環?所謂的循環,其實也就是在一個協程中循環接受服務端的數據,所以叫EventLoop,基於事件的網絡編程,現在已經是標配了,核心來自於linux的epoll模型,所以取名RunEventLoop也不奇怪了。
3. client.go中通過reConnect建立一定數量(默認1)的TCP連接
func (c *client) RunEventLoop(newSession NewSessionCallback) { c.Lock() c.newSession = newSession c.Unlock() c.reConnect()}// a for-loop connect to make sure the connection pool is valid//所謂的reConnect,其實就是在客戶端建立多個tcp連接,//數量可以自由設定,默認是1func (c *client) reConnect() { var num, max, times, interval int max = c.number //這個就是客戶端連接池數量,默認是1 interval = c.reconnectInterval if interval == 0 { //建連失敗後等待一段時間再試 interval = reconnectInterval } for { //循環建立指定數量的tcp連接 if c.IsClosed() { log.Warnf("client{peer:%s} goroutine exit now.", c.addr) break }
num = c.sessionNum() if max <= num { //建立了指定數量的tcp連接後,就break出來 break } c.connect() //在這裡建立tcp連接 times++ if maxTimes < times { times = maxTimes } //這裡其實是一些客戶端建連優化,如果建連失敗,則等待一段時間後再嘗試 //而不是for循環瘋狂嘗試,那沒有意義 <-gxtime.After(time.Duration(int64(times) * int64(interval))) }}func (c *client) connect() { var ( err error ss Session )
for { ss = c.dial() //會調用dialTCP方法,返回的是Session對象 err = c.newSession(ss) //NewSessionCallback回調函數 if err == nil { ss.(*session).run() c.Lock() c.ssMap[ss] = struct{}{} //連接池是用 map[Session]struct{}保存的 c.Unlock() ss.SetAttribute(sessionClientKey, c) break } }}
func (c *client) dialTCP() Session { var ( err error conn net.Conn ) for { if c.IsClosed() { return nil } //調用golang的net庫中DialTimeout函數,建立tcp連接 conn, err = net.DialTimeout("tcp", c.addr, connectTimeout) if err == nil { return newTCPSession(conn, c) } //無法建立tcp連接,等待connectInterval一段時間後再試, //瘋狂的嘗試建立連接,是沒有意義的,所以要等待一段時間 log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err)) <-gxtime.After(connectInterval) }}//把net.Conn包裝成gettyTCPConnfunc newTCPSession(conn net.Conn, endPoint EndPoint) Session { var c *gettyTCPConn = newGettyTCPConn(conn) session := newSession(endPoint, c) session.name = defaultTCPSessionName
return session}4. 拿到Session對象後就好辦了,會調用Session.run方法開啟goroutine,最終會調用func (s *session) handleTCPPackage() error ,這個handleTCPPackage是核心所在,這裡面就是一系列從net.Conn上讀取數據包的過程了,核心是啟動一個for永久循環,一直在後臺跑著
func (s *session) handleTCPPackage() error { var( bufp *[]byte buf []byte pktBuf *bytes.Buffer ) bufp = gxbytes.GetBytes(maxReadBufLen) buf = *bufp pktBuf = gxbytes.GetBytesBuffer() conn = s.Connection.(*gettyTCPConn) for { bufLen, err = conn.recv(buf) } if 0 == bufLen { continue } pktBuf.Write(buf[:bufLen]) for { if pktBuf.Len() <= 0 { break } pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) s.UpdateActive() s.addTask(pkg) pktBuf.Next(pkgLen) } }上面這段代碼需要仔細解釋一下,具體的步驟是:
從socket裡面讀取字節,也就是conn.recv方法,但是並不知道這次可以讀到多少字節,因為tcp流不像udp報文,那怎麼才能從流中讀取一個完整的服務端發過來的封包,或者服務端怎麼才能完整讀取到客戶端發過來的一個完整的封包呢?這就是網絡編程中編解碼的作用了
conn.recv會返回本次讀取到的字節數,如果=0就繼續讀,continue即可
如果能夠讀取到>0的字節數,此時怎麼解釋這些字節呢,getty框架或者說任何一個網絡框架它自己是不知道的,這個必須交給應用程式開發人員,只有應用程式開發人員才知道如何解釋這些字節,在getty中這是靠Session.SetPkgHandler(ReadWriter)實現的
從字節流中把字節切片轉換為業務對象,這就是通信協議,一般現在用的都是固定長度的header+變長的body,這就要應用程式開發人員自己編碼實現了,完善的RPC框架會幫你把這件事情做掉,但是getty沒有那麼完善,所以這個事情得自己做,一般就是用固定長度的header比如4個字節表示整體報文長度,然後後面跟著body的字節就行了,所以一般就是先讀取4個字節,把這4個字節轉換為int,比如得到13325,那就表示本次client發過來的完整報文是13325,去掉4個字節的頭部,剩下13321位元組,把這13321位元組按照約定的序列化方法,直接轉換為golang結構體就行了。現在一般用pb,所以得到了字節切片,就可以用pb直接反序列化為golang結構體、反序列化為java對象、c++對象、python對象都可以,pb是跨語言的,很不錯的序列化框架。
Session結構體裡面的Reader接口定義如下:
// Reader is used to unmarshal a complete pkg from buffertype Reader interface { // Read Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg. // When receiving a tcp network streaming segment, there are 4 cases as following: // case 1: a error found in the streaming segment; // case 2: can not unmarshal a pkg header from the streaming segment; // case 3: unmarshal a pkg header but can not unmarshal a pkg from the streaming segment; // case 4: just unmarshal a pkg from the streaming segment; // case 5: unmarshal more than one pkg from the streaming segment; // // The return value is (nil, 0, error) as case 1. // The return value is (nil, 0, nil) as case 2. // The return value is (nil, pkgLen, nil) as case 3. // The return value is (pkg, pkgLen, nil) as case 4. // The handleTcpPackage may invoke func Read many times as case 5. Read(Session, []byte) (interface{}, int, error)}5. 所以客戶端需要在NewSessionCallback回調函數中註冊一個編解碼實現類,用來進行真正的編解碼,調用Session.SetPkgHandler(ReadWriter) 方法註冊一個packet處理器即可
session.SetPkgHandler(pkgHandler)session.SetEventListener(EventListener)所謂的PkgHandler,只要是一個結構體,實現ReadWriter接口即可,調用Session.SetEventListener(EventListener)方法註冊事件監聽,尤其是它的OnMessage方法,收到字節流解碼成golang結構體後,就會調用OnMessage
type EventListener interface { OnOpen(Session) error OnClose(Session) OnError(Session, error) OnCron(Session) OnMessage(Session, interface{})}6. Session接口上的 SetEventListener(EventListener)和SetPkgHandler(ReadWriter)的調用順序需要注意一下,getty是從socket中讀取到字節後,先調用PakHandler把字節轉換為golang結構體業務對象,然後再執行EventListener的OnMessage(Session,interface{})方法
未完待續,還有Server端的實現以及Session的詳細剖析...