今天這篇文章主要是針對 Go channel 的重點分析,一開始寫的時候以為範圍不會太大,但洋洋灑灑還是寫破了萬字,成為了一篇覆蓋面較廣和有一定深度的長文分析。
大家覺得不錯的話,歡迎關注煎魚和三連一波 ✍️。
接下來和煎魚一起正式開始 Go channel 的學習之旅!
Go 語言中的一大利器那就是能夠非常方便的使用 go 關鍵字來進行各種並發,而並發後又必然會涉及通信。
Channel 自然而然就成為了 Go 語言開發者中必須要明白明了的一個 「東西」 了,更別提實際工程應用和日常面試了,屬於必知必會。
本文目錄:
什麼是 channel在 Go 語言中,channel 可以稱其為通道,也可以叫管道。channel 主要常見於與 goroutine+select 搭配使用,再結合語錄的描述。可以知道 channel 就是用於 goroutine 的數據通信:
演示代碼如下:
func main() {
ch := make(chan string)
go func() {
ch <- "煎魚"
}()
msg := <-ch
fmt.Println(msg)
}在 goroutine1 中寫入 「煎魚」 到變量 ch 中,goroutine2 監聽變量 ch,並阻塞等待讀取到值 「煎魚」 最終返回,結束流程。
在此 channel 承載著一個銜接器的橋梁:
這也是 channel 的經典思想了,不要通過共享內存來通信,而是通過通信來實現內存共享(Do not communicate by sharing memory; instead, share memory by communicating)。
從模式上來看,其就是在多個 goroutine 藉助 channel 來傳輸數據,實現了跨 goroutine 間的數據傳輸,多者獨立運行,不需要強關聯,更不影響對方的 goroutine 狀態。不存在 goroutine1 對 goroutine2 進行直傳的情況。
這裡思考一個問題,那 goroutine1 和 goroutine2 又怎麼互相知道自己的數據 」到「 了呢?
channel 基本特性在 Go 語言中,channel 的關鍵字為 chan,數據流向的表現方式為 <-,代碼解釋方向是從左到右,據此就能明白通道的數據流轉方向了。
channel 共有兩種模式,分別是:雙向和單向;三種表現方式,分別是:聲明雙向通道:chan T、聲明只允許發送的通道:chan <- T、聲明只允許接收的通道:<- chan T。
channel 中還分為 「無緩衝 channel」 和 「緩衝 channel」。
演示代碼如下:
// 無緩衝
ch1 := make(chan int)
// 緩衝區為 3
ch2 := make(chan int, 3)接下來我們進一步展開這兩類來看。
無緩衝 channel無緩衝的 channel(unbuffered channel),其緩衝區大小則默認為 0。在功能上其接受者會阻塞等待並阻塞應用程式,直至收到通信和接收到數據。
這種常用於兩個 goroutine 間互相同步等待的應用場景:
unbuffered channel(via @William Kennedy)緩衝 channel有緩存的 channel(buffered channel),其緩存區大小是根據所設置的值來調整。在功能上,若緩衝區未滿則不會阻塞,會源源不斷的進行傳輸。當緩衝區滿了後,發送者就會阻塞並等待。而當緩衝區為空時,接受者就會阻塞並等待,直至有新的數據:
buffered channel(via @William Kennedy)在實際的應用場景中,兩者根據業務情況選用就可以了,不需要太過糾結於兩者是否有性能差距,沒意義。
channel 本質channel 聽起來實現了一個非常酷的東西,也是日常工作中常常會被面試官問到的問題。
但其實 channel 並沒有那麼的 "神秘",就是一個環形隊列的配合。
接下來我們一步步的剖開 channel,看看裡面到底是什麼,怎麼實現的跨 goroutine 通信,數據結構又是什麼,兩者又如何實現數據傳輸的?
基本原理本質上 channel 在設計上就是環形隊列。其包含發送方隊列、接收方隊列,加上互斥鎖 mutex 等結構。
channel 是一個有鎖的環形隊列:
數據結構hchan 結構體是 channel 在運行時的具體表現形式:
// src/runtime/chan.go
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
buf:指向長度為 dataqsiz 的底層數組,僅有當 channel 為緩衝型的才有意義。recvq:接受者的 sudog 等待隊列(緩衝區不足時阻塞等待的 goroutine)。在數據結構中,我們可以看到 recvq 和 sendq,其表現為等待隊列,其類型為 runtime.waitq 的雙向鍊表結構:
type waitq struct {
first *sudog
last *sudog
}且無論是 first 屬性又或是 last,其類型都為 runtime.sudog 結構體:
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
...
}sudog 是 Go 語言中用於存放協程狀態為阻塞的 goroutine 的雙向鍊表抽象,你可以直接理解為一個正在等待的 goroutine 就可以了。
在後續的實現原理分析中,基本圍繞著上述數據結構進行大量的討論,建議可以認真思考一下。
channel 實現原理在了解了 channel 的基本原理後,我們進入到與應用工程中更緊密相關的部分,那就是 channel 的四大塊操作,分別是:「創建、發送、接收、關閉」。
我們將針對這四塊進行細緻的分析和講解。因此接下來的內容比較龐大,內容上將分為兩個角度來講述,分別是先從源碼角度進行分析,再進行圖示匯總。以便於大家更好的理解和思考
創建 chan創建 channel 的演示代碼:
ch := make(chan string)其在編譯器翻譯後對應 runtime.makechan 或 runtime.makechan64 方法:
// 通用創建方法
func makechan(t *chantype, size int) *hchan
// 類型為 int64 的進行特殊處理
func makechan64(t *chantype, size int64) *hchan通過前面我們得知 channel 的基本單位是 hchan 結構體,那麼在創建 channel 時,究竟還需要做什麼是呢?
我們一起分析一下 makechan 方法,就能知道了。
源碼如下:
// src/runtime/chan.go
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}創建 channel 的邏輯主要分為三大塊:
當前 channel 不存在緩衝區,也就是元素大小為 0 的情況下,就會調用 mallocgc 方法分配一段連續的內存空間。當前 channel 存儲的類型存在指針引用,就會連同 hchan 和底層數組同時分配一段連續的內存空間。需要注意到一塊特殊點,那就是 channel 的創建都是調用的 mallocgc 方法,也就是 channel 都是創建在堆上的。因此 channel 是會被 GC 回收的,自然也不總是需要 close 方法來進行顯示關閉了。
從整體上來講,makechan 方法的邏輯比較簡單,就是創建 hchan 並分配合適的 buf 大小的堆上內存空間。
發送數據channel 發送數據的演示代碼:
go func() {
ch <- "煎魚"
}()其在編譯器翻譯後對應 runtime.chansend1 方法:
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}其作為編譯後的入口方法,實則指向真正的實現邏輯,也就是 chansend 方法。
前置處理在第一部分中,我們先看看 chan 發送的一些前置判斷和處理:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
// 省略一些調試相關
...
}
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}一開始 chansend 方法在會先判斷當前的 channel 是否為 nil。若為 nil,在邏輯上來講就是向 nil channel 發送數據,就會調用 gopark 方法使得當前 Goroutine 休眠,進而出現死鎖崩潰,表象就是出現 panic 事件來快速失敗。
緊接著會對非阻塞的 channel 進行一個上限判斷,看看是否快速失敗。
失敗的場景如下:
若非阻塞且未關閉,同時底層數據 dataqsiz 大小為 0(緩衝區無元素),則會返回失敗。。若是 qcount 與 dataqsiz 大小相同(緩衝區已滿)時,則會返回失敗。上互斥鎖在完成了 channel 的前置判斷後,即將在進入發送數據的處理前,channel 會進行上鎖:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
}上鎖後就能保住並發安全。另外我們也可以考慮到,這種場景會相對依賴單元測試的覆蓋,因為一旦沒考慮周全,漏上鎖了,基本就會出問題。
直接發送在正式開始發送前,加鎖之後,會對 channel 進行一次狀態判斷(是否關閉):
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}這種情況是最為基礎的,也就是當前 channel 有正在阻塞等待的接收方,那麼只需要直接發送就可以了。
緩衝發送非直接發送,那麼就考慮第二種場景,判斷 channel 緩衝區中是否還有空間:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
}會對緩衝區進行判定(qcount 和 dataqsiz 欄位),以此識別緩衝區的剩餘空間。緊接進行如下操作:
調用 chanbuf 方法,以此獲得底層緩衝數據中位於 sendx 索引的元素指針值。調用 typedmemmove 方法,將所需發送的數據拷貝到緩衝區中。數據拷貝後,對 sendx 索引自行自增 1。同時若 sendx 與 dataqsiz 大小一致,則歸 0(環形隊列)。自增完成後,隊列總數同時自增 1。解鎖互斥鎖,返回結果。至此針對緩衝區的數據操作完成。但若沒有走進緩衝區處理的邏輯,則會判斷當前是否阻塞 channel,若為非阻塞,將會解鎖並直接返回失敗。
配合圖示如下:
阻塞發送在進行了各式各樣的層層篩選後,接下來進入阻塞等待發送的過程:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
}
調用 getg 方法獲取當前 goroutine 的指針,用於後續發送數據。調用 acquireSudog 方法獲取 sudog 結構體,並設置當前 sudog 具體的待發送數據信息和狀態。調用 c.sendq.enqueue 方法將剛剛所獲取的 sudog 加入待發送的等待隊列。調用 gopark 方法掛起當前 goroutine(會記錄執行位置),狀態為 waitReasonChanSend,阻塞等待 channel。調用 KeepAlive 方法保證待發送的數據值是活躍狀態,也就是分配在堆上,避免被 GC 回收。配合圖示如下:
在當前 goroutine 被掛起後,其將會在 channel 能夠發送數據後被喚醒:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 從這裡開始喚醒,並恢復阻塞的發送操作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}喚醒 goroutine(調度器在停止 g 時會記錄運行線程和方法內執行的位置)並完成 channel 的阻塞數據發送動作後。進行基本的參數檢查,確保是符合要求的(縱深防禦),接著開始取消 mysg 上的 channel 綁定和 sudog 的釋放。
至此完成所有類別的 channel 數據發送管理。
接收數據channel 接受數據的演示代碼:
msg := <-ch
msg, ok := <-ch兩種方法在編譯器翻譯後分別對應 runtime.chanrecv1 和 runtime.chanrecv2 兩個入口方法,其再在內部再進一步調用 runtime.chanrecv 方法:
需要注意,發送和接受 channel 是相對的,也就是其核心實現也是相對的。因此在理解時也可以結合來看。
前置處理func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}一開始時 chanrecv 方法會判斷其是否為 nil channel。
場景如下:
若 channel 是 nil channel,且為阻塞接收則調用 gopark 方法掛起當前 goroutine。而接下來對於非阻塞模式的 channel 會進行快速失敗檢查,檢測 channel 是否已經準備好接收。
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}其分以下幾種情況:
無緩衝區:循環隊列為 0 及等待隊列 sendq 內沒有 goroutine 正在等待。隨後會對 channel 的 closed 狀態進行判斷,因為 channel 是無法重複打開的,需要確定當前 channel 是否為未關閉狀態。再確定接收失敗,返回。
但若是 channel 已經關閉且不存在緩存數據了,則會清理 ep 指針中的數據並返回。
直接接收當發現 channel 上有正在阻塞等待的發送方時,則直接進行接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
...
}
緩衝接收當發現 channel 的緩衝區中有元素時:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
...
}將會調用 chanbuf 方法根據 recvx 的索引位置取出數據,找到要接收的元素進行處理。若所接收到的數據和所傳入的變量均不為空,則會調用 typedmemmove 方法將緩衝區中的數據拷貝到所傳入的變量中。
最後數據拷貝完畢後,進行各索引項和隊列總數的自增增減,並調用 typedmemclr 方法進行內存數據的清掃。
阻塞接收當發現 channel 上既沒有待發送的 goroutine,緩衝區也沒有數據時。將會進入到最後一個階段阻塞接收:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}這一塊接收邏輯與發送也基本類似,主體就是獲取當前 goroutine,構建 sudog 結構保存當前待接收數據(發送方)的地址信息,並將 sudog 加入等待接收隊列。最後調用 gopark 方法掛起當前 goroutine,等待喚醒。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 被喚醒後從此處開始
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}被喚醒後,將恢復現場,回到對應的執行點,完成最後的掃尾工作。
關閉 chan關閉 channel 主要是涉及到 close 關鍵字:
close(ch)其對應的編譯器翻譯方法為 closechan 方法:
func closechan(c *hchan)
前置處理func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}基本檢查和關閉標誌設置,保證 channel 不為 nil 和未關閉,保證邊界。
釋放接收方在完成了異常邊界判斷和標誌設置後,會將接受者的 sudog 等待隊列(recvq)加入到待清除隊列 glist 中:
func closechan(c *hchan) {
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
...
}所取出並加入的 goroutine 狀態需要均為 _Gwaiting,以保證後續的新一輪調度。
釋放發送方同樣,與釋放接收方一樣。會將發送方也加入到到待清除隊列 glist 中:
func closechan(c *hchan) {
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
...
}
協程調度將所有 glist 中的 goroutine 狀態從 _Gwaiting 設置為 _Grunnable 狀態,等待調度器的調度:
func closechan(c *hchan) {
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}後續所有的 goroutine 允許被重新調度後。若原本還在被動阻塞的發送方或接收方,將重獲自由,後續該幹嘛就去幹嘛了,再跑回其所屬的應用流程。
channel send/recv 分析sendsend 方法承擔向 channel 發送具體數據的功能:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
調用 sendDirect 方法將待發送的數據直接拷貝到待接收變量的內存地址(執行棧)。例如:msg := <-ch 語句,也就是將數據從 ch 直接拷貝到了 msg 的內存地址。調用 sg.g 屬性, 從 sudog 中獲取等待接收數據的 goroutine,並傳遞後續喚醒所需的參數。調用 goready 方法喚醒需接收數據的 goroutine,期望從 _Gwaiting 狀態調度為 _Grunnable。recvrecv 方法承擔在 channel 中接收具體數據的功能:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}該方法在接受上分為兩種情況,分別是直接接收和緩衝接收:
調用 recvDirect 方法,其作用與 sendDirect 方法相對,會直接從發送方的 goroutine 調用棧中將數據拷貝過來到接收方的 goroutine。調用 chanbuf 方法,根據 recvx 索引的位置讀取緩衝區元素,並將其拷貝到接收方的內存地址。拷貝完畢後,對 sendx 和 recvx 索引位置進行調整。最後還是常規的 goroutine 調度動作,會調用 goready 方法來喚醒當前所處理的 sudog 的對應 goroutine。那麼在下一輪調度時,既然已經接收了數據,自然發送方也就會被喚醒。
總結在本文中我們針對 Go 語言的 channel 進行了基本概念的分析和講解,同時還針對 channel 的設計原理和四大操作(創建、發送、接收、關閉)進行了源碼分析和圖示分析。
初步看過一遍後,再翻看。不難發現,Go 的 channel 設計並不複雜,記住他的數據結構就是帶緩存的環形隊列,再加上對稱的 sendq、recvq 等雙向鍊表的輔助屬性,就能勾畫出 channel 的基本邏輯流轉模型。
在具體的數據傳輸上,都是圍繞著 「邊界上下限處理,上互斥鎖,阻塞/非阻塞,緩衝/非緩衝,緩存出隊列,拷貝數據,解互斥鎖,協程調度」 在不斷地流轉處理。在基本邏輯上也是相對重合的,因為發送和接收,創建和關閉總是相對的。
如果更進一步深入探討,還可以圍繞著 CSP 模型、goroutine 調度等進一步的思考和理解。這一塊會在後續的章節中再一步展開。