並發編程--用SingleFlight合併重複請求

2021-02-19 代碼與遠方

大家好啊,今天網管想給大家介紹一下Go的singleflight包,當然它不是直譯過來的單飛的意思~~!SingleFlight是Go語言sync擴展庫提供的另一種並發原語,那麼SingleFlight是用於解決什麼問題的呢?官方文檔裡的解釋是:

Package singleflight provides a duplicate function call suppression mechanism.

翻譯過來就是:singleflight包提供了一種抑制重複函數調用的機制。

具體到Go程序運行的層面來說,SingleFlight的作用是在處理多個goroutine同時調用同一個函數的時候,只讓一個goroutine去實際調用這個函數,等到這個goroutine返回結果的時候,再把結果返回給其他幾個同時調用了相同函數的goroutine,這樣可以減少並發調用的數量。在實際應用中也是,它能夠在一個服務中減少對下遊的並發重複請求。還有一個比較常見的使用場景是用來防止緩存擊穿。

Go提供的SingleFlight

Go擴展庫裡用singleflight.Group結構體類型提供了SingleFlight並發原語的功能。

singleflight.Group類型提供了三個方法:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

func (g *Group) Forget(key string)

Do方法,接受一個字符串Key和一個待調用的函數,會返回調用函數的結果和錯誤。使用Do方法的時候,它會根據提供的Key判斷是否去真正調用fn函數。同一個 key,在同一時間只有第一次調用Do方法時才會去執行fn函數,其他並發的請求會等待調用的執行結果。DoChan方法:類似Do方法,只不過是一個異步調用。它會返回一個通道,等fn函數執行完,產生了結果以後,就能從這個 chan 中接收這個結果。Forget方法:在SingleFlight中刪除一個Key。這樣一來,之後這個Key的Do方法調用會執行fn函數,而不是等待前一個未完成的fn 函數的結果。應用場景

了解了Go語言提供的 SingleFlight並發原語都有哪些方法可以調用後 ,下面介紹兩個它的應用場景。

查詢DNS記錄

Go語言的net標準庫裡使用的lookupGroup結構,就是Go擴展庫提供的原語singleflight.Group

type Resolver struct {
  .
 // 源碼地址 https://github.com/golang/go/blob/master/src/net/lookup.go#L151
 // lookupGroup merges LookupIPAddr calls together for lookups for the same
 // host. The lookupGroup key is the LookupIPAddr.host argument.
 // The return values are ([]IPAddr, error).
 lookupGroup singleflight.Group
}

它的作用是將對相同域名的DNS記錄查詢合併成一個查詢,下面是net庫提供的DNS記錄查詢方法LookupIp使用lookupGroup這個SingleFlight進行合併查詢的相關源碼,它使用的是異步查詢的方法DoChan。

func LookupIP(host string) ([]IP, error) {
 addrs, err := DefaultResolver.LookupIPAddr(context.Background(), host)
  .
}

func (r *Resolver) lookupIPAddr(ctx context.Context, network, host string) ([]IPAddr, error) {
  .
  // 使用SingleFlight的DoChan合併多個查詢請求
 ch, called := r.getLookupGroup().DoChan(lookupKey, func() (interface{}, error) {
  defer dnsWaitGroup.Done()
  return testHookLookupIP(lookupGroupCtx, resolverFunc, network, host)
 })
 if !called {
  dnsWaitGroup.Done()
 }
 
 select {
 case <-ctx.Done():
  .
 case r := <-ch:
  lookupGroupCancel()
  if trace != nil && trace.DNSDone != nil {
   addrs, _ := r.Val.([]IPAddr)
   trace.DNSDone(ipAddrsEface(addrs), r.Shared, r.Err)
  }
  return lookupIPReturn(r.Val, r.Err, r.Shared)
 }
}

上面的源碼做了很多刪減,只留了SingleFlight合併查詢的部分,如果有興趣可以去GitHub上看一下完整的源碼,訪問連結https://github.com/golang/go/blob/master/src/net/lookup.go#L261 ,可直接定位到這部分的源碼。

網管是不是很貼心,記得三連啊~!

防止緩存擊穿

在項目裡使用緩存時,一個常見的用法是查詢一個數據先去查詢緩存,如果沒有就去資料庫裡查到數據並緩存到Redis裡。那麼緩存擊穿問題是指,高並發的系統中,大量的請求同時查詢一個緩存Key 時,如果這個 Key 正好過期失效,就會導致大量的請求都打到資料庫上,這就是緩存擊穿。用 SingleFlight 來解決緩存擊穿問題再合適不過,這個時候只要這些對同一個 Key 的並發請求的其中一個到資料庫中查詢就可以了,這些並發的請求可以共享同一個結果。

下面是一個模擬用SingleFlight並發原語合併查詢Redis緩存的程序,你可以自己動手測試一下,開10個goroutine去查詢一個固定的Key,觀察一下返回結果就會發現最終只執行了一次Redis查詢。

// 模擬一個Redis客戶端
type client struct {
 // ... 其他的配置省略
 requestGroup singleflight.Group
}

// 普通查詢
func (c *client) Get(key string) (interface{}, error) {
 fmt.Println("Querying Database")
 time.Sleep(time.Second)
 v := "Content of key" + key
 return  v, nil
}

// SingleFlight查詢
func (c *client) SingleFlightGet(key string) (interface{}, error) {
 v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
  return c.Get(key)

 })
 if err != nil {
  return nil, err
 }
 return v, err
}

完整的測試源碼可以點擊閱讀原文,去我的GitHub倉庫下載

實現原理

最後我們來看一下singleflight.Group的實現原理,通過它的源碼也是能學到不少用Go語言編程的技巧的。singleflight.Group由一個互斥鎖sync.Mutex和一個映射表組成,每一個 singleflight.call結構體都保存了當前調用對應的信息:

type Group struct {
 mu sync.Mutex
 m  map[string]*call
}

type call struct {
 wg sync.WaitGroup

 val interface{}
 err error

 dups  int
 chans []chan<- Result
}

下面我們來看看 Do 和 DoChan 方法是怎麼實現的。

Do方法

SingleFlight 定義一個call結構體,每個結構體都保存了fn調用對應的信息。

Do方法的執行邏輯是每次調用Do方法都會先去獲取互斥鎖,隨後判斷在映射表裡是否已經有Key對應的fn函數調用信息的call結構體。

當不存在時,證明是這個Key的第一次請求,那麼會初始化一個call結構體指針,增加SingleFlight內部持有的sync.WaitGroup計數器到1。釋放互斥鎖,然後阻塞的等待doCall方法執行fn函數的返回結果當存在時,增加call結構體內代表fn重複調用次數的計數器dups,釋放互斥鎖,然後使用WaitGroup等待fn函數執行完成。

call結構體的val 和 err 兩個欄位只會在 doCall方法中執行fn有返回結果後才賦值,所以當 doCall方法 和 WaitGroup.Wait返回時,函數調用的結果和錯誤會返回給Do方法的所有調用者。


  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
      g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
      // 存在相同的key, 增加計數
      c.dups++
      g.mu.Unlock()
      c.wg.Wait() //等待這個key對應的fn調用完成
      return c.val, c.err, true // 返回fn調用的結果
    }
    c := new(call) // 不存在key, 是第一個請求, 創建一個call結構體
    c.wg.Add(1)
    g.m[key] = c //加入到映射表中
    g.mu.Unlock()
  

    g.doCall(c, key, fn) // 調用方法
    return c.val, c.err, c.dups > 0
  }

doCall方法會去實際調用fn函數,因為call結構體初始化後forgotten欄位的默認值是false,fn調用有返回後,會把對應的Key刪掉。這樣這輪請求都返回後,下一輪使用同一的Key的請求會重新調用執行一次fn函數。


  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()
  

    g.mu.Lock()
    if !c.forgotten { // 已調用完,刪除這個key
      delete(g.m, key)
    }
    for _, ch := range c.chans {
      ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
  }

DoChan方法

SingleFlight還提供了異步調用DoChan方法,它的執行邏輯和Do方法類似,唯一不同的是調用者不用阻塞等待調用的返回, DoChan方法會創建一個chan Result通道返回給調用者,調用者通過這個通道就能接受到fn函數的結果。這個chan Result通道,在返回給調用者前會先放到call結構體的維護的通知隊列裡,待fn函數返回結果後DoChan方法會把結果發送給通知隊列中的每個通道。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 ch := make(chan Result, 1)
 g.mu.Lock()
 if g.m == nil {
  g.m = make(map[string]*call)
 }
 if c, ok := g.m[key]; ok {
  c.dups++
  c.chans = append(c.chans, ch)
  g.mu.Unlock()
  return ch
 }
 c := &call{chans: []chan<- Result{ch}}
 c.wg.Add(1)
 g.m[key] = c
 g.mu.Unlock()

 go g.doCall(c, key, fn)

 return ch
}

type Result struct {
 Val    interface{}
 Err    error
 Shared bool
}

總結

學會SingleFlight這個並發原語後,下次在遇到類似在高並發情況下查詢DNS記錄、Redis緩存這樣的場景的時候就可以應用上啦。最後我給你留個思考題吧,上面用SingleFlight查詢Redis緩存的例子使用的是同步阻塞方法Do,你能不能改成使用異步非阻塞的DoChan方法呢?另外能不能給SingleFlightGet增加一個超時返回錯誤的功能呢?

提示一下,使用上下文對象,返回的錯誤是ctx.Err()

可以把在留言裡寫下你的解決方案,最好能附上源碼連結,歡迎把文章分享給你更多的朋友,一起討論。還沒關注公眾號「網管叨bi叨」的抓緊關注呀,每周都會有乾貨技術分享。

關注公眾號,獲取更多精選技術原創文章


相關焦點

  • 為什麼用 Java —— 關於並發編程
    前幾天發完《聊聊 Ruby on Rails》那篇文章後,有朋友問到:後臺準備考慮從 Ruby 遷移,問有沒有什麼推薦的語言,尤其是主要需求是大規模高並發,便於維護升級。和陶師兄每次吃飯聊天都覺得可以從他那學習到很多他多年在 Infrasturcture 上積累下來的寶貴經驗。本來這個話題其實想讓他自己寫的,可是貌似很多牛人對寫文章還是有點惰性的,所以今天很多的內容來自於陶濤師兄,不過還是由我操筆了。提高並發的兩種模式提高高並發其實有兩種大思路。
  • Python並發編程初步
    為了提高程序的效率,一個方面改變程序的順序執行,用異步方式,防止由於某個耗時步驟,而影響後續程序的執行。另一個方面是採用並發方式執行,重複利用多核CPU優勢加速執行。關於並發編程大家可能比較熟悉的是Golang的協程、通道和Node.js 的async.parallel異步並發編程。
  • 「原創」Java並發編程系列02|並發編程三大核心問題
    要快速準確的發現並解決這些問題,首先就是要弄清並發編程的本質,並發編程要解決的是什麼問題。本文將帶你深入理解並發編程要解決的三大問題:原子性、可見性、有序性。補充知識硬體的發展中,一直存在一個矛盾,CPU、內存、I/O設備的速度差異。
  • 「原創」Java並發編程系列09|基礎乾貨
    本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫本文為何適原創並發編程系列第9篇。現在,我們進入正題:介紹並發編程的基礎性概念。
  • Alibaba架構師從零開始,一步一步帶你進入並發編程的世界
    雲計算的時代,對系統的高性能、高並發要求更高。所以,深入了解和掌握Java的多線程機制原理,非常有用,非常必要。筆記儘量減少「噦嗦"式的文字語言,全部用Demo式案例來講解技術點的實現,使讀者看到代碼及運行結果後就可以知道此項目要解決的是什麼問題。類似於網絡中Blog的風格,可讓讀者用最短的時間學會此知識點,明白此知識點如何應用,以及在使用時要避免什麼。
  • Elixir 與 Go 多角度對比:背景+編程風格+並發處理
    下面通過它們的背景、編程風格及並發的處理對比這兩門語言。Go/Golang自2009年起由Google研發,以二進位文件( 編譯後)的形式運行在部署的平臺上。 最開始的時候,它被作為一種嘗試去 創建一種新的程式語言, 以解決其他程式語言的主要弊端,並保持其優勢。
  • 大話程序猿眼裡的高並發
    ,有很多用戶同時的訪問URL地址,比如:淘寶的雙11,雙12,就會產生高並發,如貼吧的爆吧,就是惡意的高並發請求,也就是DDOS攻擊,再屌絲點的說法就像玩擼啊擼被ADC暴擊了一樣,那傷害你懂得(如果你看懂了,這個說法說明是正在奔向人生巔峰的屌絲。
  • 原創】Java並發編程系列01|開篇獲獎感言
    ,他剛工作時的並發編程第一原則就是不要寫並發程序。所以,並發編程已經成為一項必備技能。並發編程是Java語言的重要特性之一,它能使複雜的代碼變得更簡單,從而極大地簡化複雜系統的開發。並發編程可以充分發揮多處理器系統的強大計算能力,隨著處理器數量的持續增長,如何高效的並發變得越來越重要。
  • Java並發編程之支持並發的list集合你知道嗎
    Java並發編程之-list集合的並發.我們都知道Java集合類中的arrayList是線程不安全的。那麼怎麼證明是線程不安全的呢?怎麼解決在並發環境下使用安全的list集合類呢?本篇是《凱哥(凱哥Java:kagejava)並發編程學習》系列之《並發集合系列》教程的第一篇:本文主要內容:怎麼證明arrayList不是線程安全的?怎麼解決這個問題?以及遇到問題解決的四個步驟及從源碼來分析作者思路。一:怎麼證明arrayList在並發情況下是線程不安全的呢?
  • 徹底理解Java並發編程原理!
    Java並發編程為四大部分:計算機並發基礎知識、JDK內置並發框架、JDK並發包剖析以及其它並發知識。具體包括線程的狀態、Java線程調度策略、線程優先級、並發模型、悲觀鎖樂觀鎖、JDK各種同步器、JDK內置AQS同步器、線程與IO、Java線程與JVM線程、阻塞與喚醒機制、JDK並發包各種工具剖析、自旋、JDK內置並發鎖、CAS、synchronized、線程池、線程之間的協作等並發方面知識及原理進行深入淺出的講解。
  • 字節跳動面試官:請用JS實現Ajax並發請求控制
    今天這道是字節跳動的:實現一個批量請求函數 multiRequest(urls, maxNum),要求如下:• 要求最大並發數 maxNum• 每當有一個請求返回,就留下一個空位,可以增加新的請求• 所有請求完成後,結果按照 urls 裡面的順序依次打出
  • Java並發編程學習前期知識上篇
    Java並發編程-前期準備知識-上我們先來看看幾個大廠真實的面試題:從上面幾個真實的面試問題來看,我們可以看到大廠的面試都會問到並發相關的問題。所以Java並發,這個無論是面試還是在工作中,並發都是會遇到的。
  • 高並發的核心技術 - 冪等的實現方案
    前端重複提交選中的數據,應該後臺只產生對應這個數據的一個反應結果。 2. 我們發起一筆付款請求,應該只扣用戶帳戶一次錢,當遇到網絡重發或系統bug重發,也應該只扣一次錢; 3. 發送消息,也應該只發一次,同樣的簡訊發給用戶,用戶會哭的; 4. 創建業務訂單,一次業務請求只能創建一個,創建多個就會出大問題。
  • 大話程序猿眼裡的高並發 - OSCHINA - 中文開源技術交流社區
    並發下的數據處理:通過表設計,如:記錄表添加唯一約束,數據處理邏輯使用事物防止並發下的數據錯亂問題通過服務端鎖進程防止包並發下的數據錯亂問題這裡主要講述的是在並發請求下的數據邏輯處理的接口,如何保證數據的一致性和完整性,這裡的並發可能是大量用戶發起的,也可能攻擊者通過並發工具發起的並發請求如例子:通過表設計防止並發導致數據錯亂需求點
  • Java多線程並發編程中並發容器第二篇之List的並發類講解
    Java多線程並發編程中並發容器第二篇之List的並發類講解概述本文我們將詳細講解list對應的並發容器以及用代碼來測試ArrayList、vector以及CopyOnWriteArrayList在100個線程向list中添加1000個數據後的比較
  • Java並發編程:synchronized
  • JAVA並發編程:線程並發工具類Callable、Future 和FutureTask的使用
    更多精彩內容請關注「菜鳥技術棧」微信公眾號CSDN:https://blog.csdn.net/lspj201007186/article/details/106247283往期內容:JAVA並發編程:線程並發工具類CountDownLatch與CyclicBarrier的作用、應用場景和實戰
  • 某廠內部並發編程神仙筆記大全_PDF
    走過路過不要錯過內容介紹所謂並發編程是指在一臺處理器上「同時」處理多個任務。並發是在同一實體上的多個事件。多個事件在同一時間間隔發生。一直以來,硬體的發展極其迅速,在多核的CPU的背景下,催生了並發編程的趨勢,通過並發編程的形式可以將多核CPU的計算能力發揮到極致,性能得到提升。面對複雜業務模型,並行程序會比串行程序更適應業務需求,而並發編程更能吻合這種業務拆分。正是因為這些優點,使得多線程技術能夠得到重視,這個技術也是一名CS學習者應該掌握的。
  • Python 3.8異步並發編程
    有效的提高程序執行效率的兩種方法是異步和並發,Golang,node.js之所以可以有很高執行效率主要是他們的協程和異步並發機制。實際上異步和並發是每一種現代語言都在追求的特性,當然Python也不例外,今天我們就講講Python 3中的異步並發編程。
  • 並發編程 71 道題及答案全送上!
    請求與保持條件:一個進程因請求資源而阻塞時,對已獲得的資源保持不放。 不剝奪條件:進程已獲得資源,在末使用完之前,不能強行剝奪。  循環等待條件:若干進程之間形成一種頭尾相接的循環等待資源關係。活鎖:任務或者執行者沒有被阻塞,由於某些條件沒有滿足,導致一直重複嘗試、失敗、嘗試、失敗。