大家好啊,今天網管想給大家介紹一下Go的singleflight包,當然它不是直譯過來的單飛的意思~~!SingleFlight是Go語言sync擴展庫提供的另一種並發原語,那麼SingleFlight是用於解決什麼問題的呢?官方文檔裡的解釋是:
Package singleflight provides a duplicate function call suppression mechanism.
翻譯過來就是:singleflight包提供了一種抑制重複函數調用的機制。
具體到Go程序運行的層面來說,SingleFlight的作用是在處理多個goroutine同時調用同一個函數的時候,只讓一個goroutine去實際調用這個函數,等到這個goroutine返回結果的時候,再把結果返回給其他幾個同時調用了相同函數的goroutine,這樣可以減少並發調用的數量。在實際應用中也是,它能夠在一個服務中減少對下遊的並發重複請求。還有一個比較常見的使用場景是用來防止緩存擊穿。
Go提供的SingleFlightGo擴展庫裡用singleflight.Group結構體類型提供了SingleFlight並發原語的功能。
singleflight.Group類型提供了三個方法:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
Do方法,接受一個字符串Key和一個待調用的函數,會返回調用函數的結果和錯誤。使用Do方法的時候,它會根據提供的Key判斷是否去真正調用fn函數。同一個 key,在同一時間只有第一次調用Do方法時才會去執行fn函數,其他並發的請求會等待調用的執行結果。DoChan方法:類似Do方法,只不過是一個異步調用。它會返回一個通道,等fn函數執行完,產生了結果以後,就能從這個 chan 中接收這個結果。Forget方法:在SingleFlight中刪除一個Key。這樣一來,之後這個Key的Do方法調用會執行fn函數,而不是等待前一個未完成的fn 函數的結果。應用場景了解了Go語言提供的 SingleFlight並發原語都有哪些方法可以調用後 ,下面介紹兩個它的應用場景。
查詢DNS記錄Go語言的net標準庫裡使用的lookupGroup結構,就是Go擴展庫提供的原語singleflight.Group
type Resolver struct {
.
// 源碼地址 https://github.com/golang/go/blob/master/src/net/lookup.go#L151
// lookupGroup merges LookupIPAddr calls together for lookups for the same
// host. The lookupGroup key is the LookupIPAddr.host argument.
// The return values are ([]IPAddr, error).
lookupGroup singleflight.Group
}它的作用是將對相同域名的DNS記錄查詢合併成一個查詢,下面是net庫提供的DNS記錄查詢方法LookupIp使用lookupGroup這個SingleFlight進行合併查詢的相關源碼,它使用的是異步查詢的方法DoChan。
func LookupIP(host string) ([]IP, error) {
addrs, err := DefaultResolver.LookupIPAddr(context.Background(), host)
.
}
func (r *Resolver) lookupIPAddr(ctx context.Context, network, host string) ([]IPAddr, error) {
.
// 使用SingleFlight的DoChan合併多個查詢請求
ch, called := r.getLookupGroup().DoChan(lookupKey, func() (interface{}, error) {
defer dnsWaitGroup.Done()
return testHookLookupIP(lookupGroupCtx, resolverFunc, network, host)
})
if !called {
dnsWaitGroup.Done()
}
select {
case <-ctx.Done():
.
case r := <-ch:
lookupGroupCancel()
if trace != nil && trace.DNSDone != nil {
addrs, _ := r.Val.([]IPAddr)
trace.DNSDone(ipAddrsEface(addrs), r.Shared, r.Err)
}
return lookupIPReturn(r.Val, r.Err, r.Shared)
}
}上面的源碼做了很多刪減,只留了SingleFlight合併查詢的部分,如果有興趣可以去GitHub上看一下完整的源碼,訪問連結https://github.com/golang/go/blob/master/src/net/lookup.go#L261 ,可直接定位到這部分的源碼。
網管是不是很貼心,記得三連啊~!
防止緩存擊穿在項目裡使用緩存時,一個常見的用法是查詢一個數據先去查詢緩存,如果沒有就去資料庫裡查到數據並緩存到Redis裡。那麼緩存擊穿問題是指,高並發的系統中,大量的請求同時查詢一個緩存Key 時,如果這個 Key 正好過期失效,就會導致大量的請求都打到資料庫上,這就是緩存擊穿。用 SingleFlight 來解決緩存擊穿問題再合適不過,這個時候只要這些對同一個 Key 的並發請求的其中一個到資料庫中查詢就可以了,這些並發的請求可以共享同一個結果。
下面是一個模擬用SingleFlight並發原語合併查詢Redis緩存的程序,你可以自己動手測試一下,開10個goroutine去查詢一個固定的Key,觀察一下返回結果就會發現最終只執行了一次Redis查詢。
// 模擬一個Redis客戶端
type client struct {
// ... 其他的配置省略
requestGroup singleflight.Group
}
// 普通查詢
func (c *client) Get(key string) (interface{}, error) {
fmt.Println("Querying Database")
time.Sleep(time.Second)
v := "Content of key" + key
return v, nil
}
// SingleFlight查詢
func (c *client) SingleFlightGet(key string) (interface{}, error) {
v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
return c.Get(key)
})
if err != nil {
return nil, err
}
return v, err
}完整的測試源碼可以點擊閱讀原文,去我的GitHub倉庫下載
實現原理最後我們來看一下singleflight.Group的實現原理,通過它的源碼也是能學到不少用Go語言編程的技巧的。singleflight.Group由一個互斥鎖sync.Mutex和一個映射表組成,每一個 singleflight.call結構體都保存了當前調用對應的信息:
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}下面我們來看看 Do 和 DoChan 方法是怎麼實現的。
Do方法SingleFlight 定義一個call結構體,每個結構體都保存了fn調用對應的信息。
Do方法的執行邏輯是每次調用Do方法都會先去獲取互斥鎖,隨後判斷在映射表裡是否已經有Key對應的fn函數調用信息的call結構體。
當不存在時,證明是這個Key的第一次請求,那麼會初始化一個call結構體指針,增加SingleFlight內部持有的sync.WaitGroup計數器到1。釋放互斥鎖,然後阻塞的等待doCall方法執行fn函數的返回結果當存在時,增加call結構體內代表fn重複調用次數的計數器dups,釋放互斥鎖,然後使用WaitGroup等待fn函數執行完成。call結構體的val 和 err 兩個欄位只會在 doCall方法中執行fn有返回結果後才賦值,所以當 doCall方法 和 WaitGroup.Wait返回時,函數調用的結果和錯誤會返回給Do方法的所有調用者。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
// 存在相同的key, 增加計數
c.dups++
g.mu.Unlock()
c.wg.Wait() //等待這個key對應的fn調用完成
return c.val, c.err, true // 返回fn調用的結果
}
c := new(call) // 不存在key, 是第一個請求, 創建一個call結構體
c.wg.Add(1)
g.m[key] = c //加入到映射表中
g.mu.Unlock()
g.doCall(c, key, fn) // 調用方法
return c.val, c.err, c.dups > 0
}doCall方法會去實際調用fn函數,因為call結構體初始化後forgotten欄位的默認值是false,fn調用有返回後,會把對應的Key刪掉。這樣這輪請求都返回後,下一輪使用同一的Key的請求會重新調用執行一次fn函數。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
if !c.forgotten { // 已調用完,刪除這個key
delete(g.m, key)
}
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
DoChan方法SingleFlight還提供了異步調用DoChan方法,它的執行邏輯和Do方法類似,唯一不同的是調用者不用阻塞等待調用的返回, DoChan方法會創建一個chan Result通道返回給調用者,調用者通過這個通道就能接受到fn函數的結果。這個chan Result通道,在返回給調用者前會先放到call結構體的維護的通知隊列裡,待fn函數返回結果後DoChan方法會把結果發送給通知隊列中的每個通道。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
type Result struct {
Val interface{}
Err error
Shared bool
}
總結學會SingleFlight這個並發原語後,下次在遇到類似在高並發情況下查詢DNS記錄、Redis緩存這樣的場景的時候就可以應用上啦。最後我給你留個思考題吧,上面用SingleFlight查詢Redis緩存的例子使用的是同步阻塞方法Do,你能不能改成使用異步非阻塞的DoChan方法呢?另外能不能給SingleFlightGet增加一個超時返回錯誤的功能呢?
提示一下,使用上下文對象,返回的錯誤是ctx.Err()
可以把在留言裡寫下你的解決方案,最好能附上源碼連結,歡迎把文章分享給你更多的朋友,一起討論。還沒關注公眾號「網管叨bi叨」的抓緊關注呀,每周都會有乾貨技術分享。
關注公眾號,獲取更多精選技術原創文章