用golang實現的dubbo-getty是怎麼進行tcp通信的?客戶端篇

2022-01-14 Code人生

收錄於話題 #golang 1個

這個通訊庫很簡潔、功能也比較弱,簡潔是因為沒有疊加太多東西進去,就是把golang的net庫包裝了一下,使得使用上更簡單,本質上是在golang的net庫增加了一些功能,倉庫地址:github.com/apache/dubbo-getty

先說一下客戶端是如何實現的,然後再說Server端如何實現,在分析對方的實現之前,先想一下,假設我們自己來實現這樣一個所謂的tpc通信庫,該怎麼搞呢?試想如下:

通信是雙方的事情,所以抽象出一個Endpoint接口

支持設置讀寫超時時間

支持收發數據時進行壓縮,消耗一些cpu,換來數據流量的減少

統計收發的報文數量,收發的字節數量

要支持序列化和反序列化,實現業務對象和字節數組的互相轉換,畢竟通過socket發送的是字節流

要支持字節流的編解碼,否則沒有邊界的tcp字節流,沒法區分數據包,這一點在netty中實現的很好,但是getty沒有實現,這需要應用開發人員自己搞定,而在dobbo-java版本中也是由框架搞定的,我們不需要操心,所以用dubbo-getty的話,就需要自己搞定協議的編解碼

支持監控tcp連接的活躍,可以關閉不活躍的tcp連接,這就要求記住任何一個tcp連接最後一次發送數據的時間戳

可能有人對序列化/反序列化、通信協議的編解碼這2個概念會有混淆,核心是因為一些rpc框架提供的API接口,是基於業務對象的send方法,方法直接接受的是業務對象,框架會自己搞定序列化和編解碼,所以如果只是調用別人寫好的框架是感受不到細節的,這裡說一下區別。

各種程式語言中把業務對象序列化為字節數組並不是編解碼,因為tcp是面向流的協議,N多次rpc調用產生的字節數組會混在一起奔向網絡通信的另一端,那如何從奔流不息的字節流中準確區分出一個又一個報文,這就是編解碼做的事情,貼一張圖例來說明編解碼的作用(要看到messageBody才是序列化後的字節對象,而真正通過socket發出去的字節流裡面還有一些Header字節呢):

getty抽象類一個EndPoint接口,裡面還用到了一個任務池(這個後面再說,本質就是協程池,和線程池差不多理解)gxsync.GenericTaskPool,核心其實是它定義的RunEventLoop方法,這裡先大概說一下,tcp的雙方收發數據,因為tcp是無限流,所以收數據必然要在一個無限循環(java裡面是線程,golang裡面是協程)中永久讀取數據的,所以這裡才取名叫Loop,有循環的意思。

// EndPoint represents the identity of the client/servertype EndPoint interface {  // ID get EndPoint ID  ID() EndPointID  // EndPointType get endpoint type  EndPointType() EndPointType  // RunEventLoop run event loop and serves client request.  RunEventLoop(newSession NewSessionCallback)  // IsClosed check the endpoint has been closed  IsClosed() bool  // Close close the endpoint and free its resource  Close()  // GetTaskPool get task pool implemented by dubbogo/gost  GetTaskPool() gxsync.GenericTaskPool}

這個接口有2個實現,一看就明白,如下所示,也是核心實現,這2個都是我們分析的重點實現。

getty並不是一個RPC框架,不提供Request-Response同步模型,客戶端發送出去後,服務端的響應是異步的,需要自己在此基礎上自行實現RPC的Request-Response同步編程模型。

getty的Client接口上並不像通常的api調用,既然api的名字都取成Client了,那麼在Client接口上理所當然應該提供一些Send之類的方法,可是這裡並不是這樣設計的,Client接口內嵌了EndPoint接口,2個接口裡面都沒有所謂的Send方法!

type Client interface {  EndPoint}type Server interface {  EndPoint}

這個Client只不過是個客戶端編程入口類而已,真正的數據收發並不是在Client接口上提供的,Client真正的核心方法是RunEventLoop,同時接受一個NewSessionCallback 回調函數,在這個函數裡面會傳入一個Session對象,這個Session才是表示Client和Server之間的一個通信通道,我們往下看看Session接口

golang並不是純粹的OOP程式語言,所以混合著函數式編程,可以看到這個NewSessionCallback就是一個函數罷了,而傳入函數的參數Session也是我們要分析的重點之一。

type NewSessionCallback func(Session) error
// RunEventLoop run event loop and serves client request.RunEventLoop(newSession NewSessionCallback)

session.go才是真正的交互接口,看它的WritePkg方法才是真命天子!一看就知道,傳入一個對像,會把它序列化成字節數組,然後通過底層的net.TcpConn發出去

WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)WriteBytes([]byte) (int, error)WriteBytesArray(...[]byte) (int, error)

所以Session接口的實現類必然會包含TCPConn,這個Connection接口是getty定義的一個socket連接

type session struct {   name     string   endPoint EndPoint   //這個是用於發送數據的socket通道   Connection}
// 表示一條連接通信雙方的通道,可以看得出來,提供了一些業務層面的功能,//比如設置讀寫超時時間,記錄收發的報文數量,設置壓縮類型,這些都是//最底層的golang的net庫並不會提供的功能,這就需要業務層來自己實現type Connection interface { ID() uint32 SetCompressType(CompressType) LocalAddr() string RemoteAddr() string incReadPkgNum() incWritePkgNum() // UpdateActive update session's active time UpdateActive() // GetActive get session's active time GetActive() time.Time readTimeout() time.Duration // SetReadTimeout sets deadline for the future read calls. SetReadTimeout(time.Duration) writeTimeout() time.Duration // SetWriteTimeout sets deadline for the future read calls. SetWriteTimeout(time.Duration) send(interface{}) (int, error) close(int) // set related session setSession(Session)}
//實現了Connection 接口,提供的是基於tcp的連接type gettyTCPConn struct { gettyConn reader io.Reader writer io.Writer   //這就是底層的網絡socket連接,golang的net.Conn是接口,   //這裡其實是net.TCPConn結構體   conn   net.Conn}
// TCPConn is an implementation of the Conn interface for TCP network connections.type TCPConn struct { conn}

根據上面對幾個結構體的闡述,客戶端的Client真正的核心方法是RunEventLoop,一定會在RunEventLoop方法中做幾個事情:

1、建立和服務端的tcp連接,也就是會調用net.Dial方法

2、構造Connection對象

3、構造Session對象

4、回調type NewSessionCallback func(Session) error,所以說golang是一個函數式程式語言,它是可以把函數傳來傳去的,和C語言的函數指針差不多

5、一旦RunEventLoop方法執行完成,一定要找個什麼地方持有對Session接口的引用,否則客戶端代碼就沒法通過Session.WritePkg方法發送數據包了,可把這個Session接口的引用作為全局變量或者某個自己業務類的局部變量

之所以說getty不是一個RPC框架,就看Session.WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error),你看它返回的是待發送的字節數、實際發送的字節數,這個完全是從網絡通信的客戶端的角度出發的,並沒有返回服務端的Response數據。

一個RPC的客戶端API設計一定會設計Send、Write之類的方法,都會返回Response對象,這樣程序代碼就可以繼續往下走,根據返回的Response中的數據進行業務代碼的編寫,而很明顯,getty並沒有提供,當客戶端調用WritePkg後,是沒法得到服務端的返回數據的,那怎麼辦,這豈不是沒法搞了,繼續往下走來分析看怎麼弄的,核心是因為getty的定位不一樣。

先來說一下客戶端使用方法,從而了解它的設計思路

1. client := getty.NewTCPClient得到一個 getty.Client 對象

client := getty.NewTCPClient(   getty.WithServerAddress(*ip+":8090"),   getty.WithConnectionNumber(*connections))

2. 調用client.RunEventLoop(newSession NewSessionCallback) 開啟事件循環,Run裡面會真正和服務端建立tcp連接,NewSessionCallback是一個函數,原型是:type NewSessionCallback func(Session) error,RunEventLoop是EndPoint接口上的方法,client和server端都分別進行了實現,從Endpoint的取名上看得出來,一個通信的雙方,必然是由2個端點組成的,2個端點各自維護自己的業務邏輯:

對於client來說,RunEventLoop就是真正和服務端建立tcp連接的地方,那為什麼名稱要取名為RunEventLoop呢,好像是在跑一個什麼事件循環?所謂的循環,其實也就是在一個協程中循環接受服務端的數據,所以叫EventLoop,基於事件的網絡編程,現在已經是標配了,核心來自於linux的epoll模型,所以取名RunEventLoop也不奇怪了。

3. client.go中通過reConnect建立一定數量(默認1)的TCP連接

func (c *client) RunEventLoop(newSession NewSessionCallback) {   c.Lock()   c.newSession = newSession   c.Unlock()   c.reConnect()}

// a for-loop connect to make sure the connection pool is valid//所謂的reConnect,其實就是在客戶端建立多個tcp連接,//數量可以自由設定,默認是1func (c *client) reConnect() {  var num, max, times, interval int  max = c.number  //這個就是客戶端連接池數量,默認是1  interval = c.reconnectInterval  if interval == 0 { //建連失敗後等待一段時間再試    interval = reconnectInterval  }  for {  //循環建立指定數量的tcp連接    if c.IsClosed() {      log.Warnf("client{peer:%s} goroutine exit now.", c.addr)      break    }
num = c.sessionNum()    if max <= num { //建立了指定數量的tcp連接後,就break出來 break }    c.connect()  //在這裡建立tcp連接 times++ if maxTimes < times { times = maxTimes }    //這裡其實是一些客戶端建連優化,如果建連失敗,則等待一段時間後再嘗試    //而不是for循環瘋狂嘗試,那沒有意義 <-gxtime.After(time.Duration(int64(times) * int64(interval))) }}

func (c *client) connect() {  var (    err error    ss  Session  )
for { ss = c.dial() //會調用dialTCP方法,返回的是Session對象    err = c.newSession(ss)  //NewSessionCallback回調函數 if err == nil { ss.(*session).run()      c.Lock()      c.ssMap[ss] = struct{}{}  //連接池是用 map[Session]struct{}保存的 c.Unlock() ss.SetAttribute(sessionClientKey, c) break    } }}
func (c *client) dialTCP() Session { var ( err error conn net.Conn  ) for { if c.IsClosed() { return nil    }    //調用golang的net庫中DialTimeout函數,建立tcp連接    conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)     if err == nil { return newTCPSession(conn, c)    }    //無法建立tcp連接,等待connectInterval一段時間後再試,    //瘋狂的嘗試建立連接,是沒有意義的,所以要等待一段時間    log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err)) <-gxtime.After(connectInterval) }}//把net.Conn包裝成gettyTCPConnfunc newTCPSession(conn net.Conn, endPoint EndPoint) Session { var c *gettyTCPConn = newGettyTCPConn(conn) session := newSession(endPoint, c) session.name = defaultTCPSessionName
return session}

4. 拿到Session對象後就好辦了,會調用Session.run方法開啟goroutine,最終會調用func (s *session) handleTCPPackage() error ,這個handleTCPPackage是核心所在,這裡面就是一系列從net.Conn上讀取數據包的過程了,核心是啟動一個for永久循環,一直在後臺跑著

func (s *session) handleTCPPackage() error {    var(        bufp     *[]byte        buf      []byte        pktBuf   *bytes.Buffer    )    bufp = gxbytes.GetBytes(maxReadBufLen)    buf = *bufp    pktBuf = gxbytes.GetBytesBuffer()    conn = s.Connection.(*gettyTCPConn)    for {        bufLen, err = conn.recv(buf)      }     if 0 == bufLen {         continue     }    pktBuf.Write(buf[:bufLen])      for {         if pktBuf.Len() <= 0 {            break       }       pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())        s.UpdateActive()       s.addTask(pkg)       pktBuf.Next(pkgLen)    }    }

上面這段代碼需要仔細解釋一下,具體的步驟是:

從socket裡面讀取字節,也就是conn.recv方法,但是並不知道這次可以讀到多少字節,因為tcp流不像udp報文,那怎麼才能從流中讀取一個完整的服務端發過來的封包,或者服務端怎麼才能完整讀取到客戶端發過來的一個完整的封包呢?這就是網絡編程中編解碼的作用了

conn.recv會返回本次讀取到的字節數,如果=0就繼續讀,continue即可

如果能夠讀取到>0的字節數,此時怎麼解釋這些字節呢,getty框架或者說任何一個網絡框架它自己是不知道的,這個必須交給應用程式開發人員,只有應用程式開發人員才知道如何解釋這些字節,在getty中這是靠Session.SetPkgHandler(ReadWriter)實現的

從字節流中把字節切片轉換為業務對象,這就是通信協議,一般現在用的都是固定長度的header+變長的body,這就要應用程式開發人員自己編碼實現了,完善的RPC框架會幫你把這件事情做掉,但是getty沒有那麼完善,所以這個事情得自己做,一般就是用固定長度的header比如4個字節表示整體報文長度,然後後面跟著body的字節就行了,所以一般就是先讀取4個字節,把這4個字節轉換為int,比如得到13325,那就表示本次client發過來的完整報文是13325,去掉4個字節的頭部,剩下13321位元組,把這13321位元組按照約定的序列化方法,直接轉換為golang結構體就行了。現在一般用pb,所以得到了字節切片,就可以用pb直接反序列化為golang結構體、反序列化為java對象、c++對象、python對象都可以,pb是跨語言的,很不錯的序列化框架。

Session結構體裡面的Reader接口定義如下:

// Reader is used to unmarshal a complete pkg from buffertype Reader interface {   // Read Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.   // When receiving a tcp network streaming segment, there are 4 cases as following:   // case 1: a error found in the streaming segment;   // case 2: can not unmarshal a pkg header from the streaming segment;   // case 3: unmarshal a pkg header but can not unmarshal a pkg from the streaming segment;   // case 4: just unmarshal a pkg from the streaming segment;   // case 5: unmarshal more than one pkg from the streaming segment;   //   // The return value is (nil, 0, error) as case 1.   // The return value is (nil, 0, nil) as case 2.   // The return value is (nil, pkgLen, nil) as case 3.   // The return value is (pkg, pkgLen, nil) as case 4.   // The handleTcpPackage may invoke func Read many times as case 5.   Read(Session, []byte) (interface{}, int, error)}

5. 所以客戶端需要在NewSessionCallback回調函數中註冊一個編解碼實現類,用來進行真正的編解碼,調用Session.SetPkgHandler(ReadWriter) 方法註冊一個packet處理器即可

session.SetPkgHandler(pkgHandler)session.SetEventListener(EventListener)

所謂的PkgHandler,只要是一個結構體,實現ReadWriter接口即可,調用Session.SetEventListener(EventListener)方法註冊事件監聽,尤其是它的OnMessage方法,收到字節流解碼成golang結構體後,就會調用OnMessage

type EventListener interface {    OnOpen(Session) error    OnClose(Session)    OnError(Session, error)    OnCron(Session)  OnMessage(Session, interface{})}

6. Session接口上的 SetEventListener(EventListener)和SetPkgHandler(ReadWriter)的調用順序需要注意一下,getty是從socket中讀取到字節後,先調用PakHandler把字節轉換為golang結構體業務對象,然後再執行EventListener的OnMessage(Session,interface{})方法

未完待續,還有Server端的實現以及Session的詳細剖析...

相關焦點

  • WIFI模塊開發教程之W600網絡篇3:STA模式下TCP Client通信
    前言本文研究如何在STA模式下進行TCP Client通信,STA模式是說模塊直接連接AP(手機熱點或者路由器),進入區域網中和其他無線設備通信,區域網中其他設備作為服務端,WIFI模塊作為客戶端。一、理論基礎本節主要要處理的有一個問題:如何利用RT_Thread連接一個已知的AP,連上AP後,TCP Client程序和網絡篇1中內容完全一致。
  • WIFI模塊開發教程之W600網絡篇1:AP模式下TCP Client通信
    前言本文研究如何在AP模式下進行TCP Client通信,所謂AP模式是說模塊起來一個softAP熱點,可以供其他WIFI設備連接,當其他設備連接成功後,另WIFI模塊作為客戶端,區域網中其他設備作為服務端進行TCP數據通信。
  • TCP長連接和心跳那些事
    其實我個人對 TCP 的很多細節也並沒有完全理解,這篇文章主要針對微信交流群裡有人提出的長連接,心跳的問題,做一個統一的整理。在 Java 中,使用 TCP 通信,大概率會涉及到 Socket、Netty,本文會借用它們的一些 API 和設置參數來輔助介紹。長連接與短連接TCP 本身並沒有長短連接的區別,長短與否,完全取決於我們怎麼用它。
  • Spring Cloud 和 Dubbo,到底用哪個好?
    springcloud的接口協議約定比較自由且鬆散,需要有強有力的行政措施來限制接口無序升級dubbo的註冊中心可以選擇zk,redis等多種,springcloud的註冊中心只能用eureka或者自研但如果我選,我會用 Spring Cloud。
  • 從一個HTTP請求來讀懂HTTP、TCP協議
    數據傳輸的介質也可能多樣,如內網裡通過網線進行傳輸,連接到公網的話會通過光纖進行連接,所以要實現不同介質間信號的轉換,還有從光纖到路由器無線脈衝轉換,距離遠的話還有信號衰減問題。所以在網絡傳輸過程中有非常多的問題需要解決,把問題分組分層,不同層次間解決不同問題,不同層次間定義標準化接口讓它們間可以進行數據的通信。
  • 200SMART的MODBUS TCP通信
    一、200SMART的MODBUS TCP通訊參數S7-200 SMART 支持做 Modbus TCP 的客戶端或者伺服器,可以實現 PLC 之間通信,也可以實現與支持此通信協議的第三方設備通信。通信夥伴數量比較多的時候,可以使用交換機,擴展乙太網接口。
  • Golang 鑽牛角尖篇 之 循環篇
    Golang 鑽牛角尖篇 之 循環篇問題描述1.無限循環func main(){ arr := []int{1,2,3} for _,v := range arr {  arr = append(arr,v) }}
  • 阿里終面:怎麼用 UDP 實現 TCP?
    其實面試官主要是想讓我說出 UDP 和 TCP 的原理上的區別,怎麼給 UDP 加些功能實現 TCP。看好去很容易就能說出一兩個 TCP 和 UDP 的區別,但如果能用女朋友都能聽懂的方式該怎麼說呢?比如面向連接,就是為了在客戶端和服務端維護連接,而建立一定的數據結構來維護雙方交互的狀態,用這樣的數據來保證所謂的面向連接的特性。知道了 TCP 的是用三次握手來建立連接,那我們是否可以讓 UDP 也發三個包來模擬 TCP 建立連接?
  • 技術乾貨 | JDK、Spring、Dubbo SPI 原理介紹
    所以 Java 的核心類庫只提供了資料庫驅動的接口 Java.sql.Driver,不同的資料庫服務提供商可以實現此接口,而開發者只需配置相應資料庫驅動的實現類,JDBC 框架就能自行加載第三方的服務以達到客戶端訪問不同類型的資料庫的功能。
  • 【Golang】使用Golang編寫Hugo發布器
    Hugo 是 Golang 編寫的靜態網站生成器,速度快,易用,可配置,我也是通過golang的學習,發現了Hugo,它不用依賴一大堆東西,一個二進位文件就可以搞定,簡潔。The world’s fastest framework for building websites.
  • 這篇文章很贊:Golang GC 探究
    1.3版本以前,golang的垃圾回收算法都非常簡陋,然後其性能也廣被詬病:go runtime在一定條件下(內存超過閾值或定期如2min),暫停所有任務的執行,進行mark&sweep操作,操作完成後啟動所有任務的執行。在內存使用較多的場景下,go程序在進行垃圾回收時會發生非常明顯的卡頓現象(Stop The World)。
  • 怎麼用VB.NET進行串口通信
    USB與上位機通信,為了做起來簡單一些優先選擇串口通信。朋友請求先在電腦上和指紋模塊把數據調通,那麼我就計劃先寫個電腦桌面小軟體測試一下,找到幾年前做的一個項目,是用VB.NET做的上位機軟體,裡面有串口通信模塊例程可以參考。那麼,本文就介紹一下怎麼在VB.NET中使用串口控制項與下位機電路板通信。
  • TCP粘包問題真的存在麼?
    起因:初學者在剛開解除tcp/ip協議的時候,一會都會寫一個tcp通信的小demo,大體功能client端負責send,server端負責recv。但是測試中會發現server端常常收到多個信息,經過上網查詢之後莫名的掉到了「TCP粘包」的坑中。
  • 弱電網絡技術之TCP協議靈魂 12 問,總會用得到
    所謂的連接,指的是客戶端和伺服器的連接,在雙方互相通信之前,TCP 需要三次握手建立連接,而 UDP 沒有相應建立連接的過程。可靠性。TCP 花了非常多的功夫保證連接的可靠,這個可靠性體現在哪些方面呢?一個是有狀態,另一個是可控制。
  • golang.org 已經成為歷史
    Go 於2009年11月10日正式開源發布,而 Go 官網 https://golang.org/ 也同時發布上線。
  • linux 下 golang 的 vim 環境配置
    【導讀】vim 做 golang 開發環境,多麼炫酷!還沒嘗試過用 vim 做開發環境的同學可以跟著這篇文檔把環境搭建起來了![root@localhost golang.org]# mkdir x[root@localhost golang.org]# cd .
  • 紅隊作業 | Python實現免殺遠控
    ,說的詳細點就是反彈shell,首先要解決三個問題:1.與服務端建立socket連接2.在服務端命令執行3.服務端把回顯返回給客戶端這裡我們用python實現,聲明一下,涉及到的內容是網絡編程,以下函數實現多半都是我百度的,所以有很多用法以及邏輯的實現都是欠考慮的
  • 中國衛星物聯網星座實現星間雷射通信零突破
    來源:國家國防科技工業局網站北京日報客戶端8月18日消息,記者當日從國家航天局獲悉,近日「行雲二號」01星、02星之間實現了建鏈流程完整、遙測狀態穩定的雙向通信,這意味著「行雲二號」兩顆衛星搭載的雷射通信載荷技術得到成功驗證,我國衛星物聯網星座實現星間雷射通信零的突破。至此,兩顆衛星自今年5月12日發射入軌開展在軌技術測試以來,所有核心技術均得到充分驗證。
  • 面試官問大量的TIME_WAIT 狀態 TCP連接對業務有什麼影響怎麼處理
    . `$ netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}'`3. `ESTABLISHED 1154`4.`tcp4 0 0 127.0.0.1.1080 127.0.0.1.59061 TIME_WAIT`8.
  • 京東商城推iPhone客戶端 掃描條碼即可比價
    用戶只需用iPhone掃描商品條碼,即可實現線下比價功能,Android客戶端也將於本月下旬推出。 關鍵詞:iPhone[73篇]  客戶端[1篇]  京東商城[1篇]  條碼[170篇]         京東商城日前發布了iPhone客戶端,除常用的下單、評價等功能外,還具備比價特色功能。