劉淑娟,愛立信廣州工程師,雲原生愛好者。
這篇文章來源於雲原生社區組織的 Kubernetes 源碼研習社的作業,是個人學習Informer機制、理解Informer各個組件的設計的總結。
背景為什麼Kubernetes需要Informer機制?我們知道Kubernetes各個組件都是通過REST API跟API Server交互通信的,而如果每次每一個組件都直接跟API Server交互去讀取/寫入到後端的etcd的話,會對API Server以及etcd造成非常大的負擔。而Informer機制是為了保證各個組件之間通信的實時性、可靠,並且減緩對API Server和etcd的負擔。
Informer 流程這個流程,建議先看看《From Controller Study Informer》
這裡我們以CoreV1. Pod資源為例子:
第一次啟動Informer的時候,Reflector 會使用 List從API Server主動獲取CoreV1. Pod的所有資源對象信息,通過 resync將資源存放在 Store中
持續使用 Reflector建立長連接,去 Watch API Server發來的資源變更事件
當2 監控到CoreV1.Pod的資源對象有增加刪除修改之後,就把資源對象存放在 DeltaFIFO中,
DeltaFIFO是一個先進先出隊列,只要這個隊列有數據,就被Pop到Controller中, 將這個資源對象存儲至 Indexer中,並且將該資源對象分發至 ShareInformer
Controller會觸發 Process回調函數
打臉所以,我自己之前寫代碼的時候,一直以為是 ShareInformer去主動watch API Server, 而現在正打臉了,是 Reflector做的List&Watch。
ListAndWatch 思考為什麼Kubernetes裡面是使用ListAndWathc呢?我們所知道的其他分布式系統常常使用RPC來觸發行為。
我們來分析下如果不這樣做,而是採用API Server輪詢推送消息給各個組件,或者各個組件輪詢去訪問API Server的話,那麼實時性就得不到保證,並且對API Server造成很大的負載,很有可能需要開啟大量的埠造成埠浪費。
從實時性出發的話:
我們希望是有任何資源的新增/改動/刪除,都需要馬上獲取並且放入消息隊列。可以對應我們Informer中的 Reflector組件,去主動獲取消息,並且放入 DeltaFIFO隊列被消費。
從減輕負載出發的話:
需要上緩存,這裡可以對應我們的 Store組件。
從設計擴展性出發的話:
作為一個「資源管理系統」的Kubernetes,我們的對象數量可能會無線擴大,那麼我們需要設計一個高效擴展的組件,去應對對象的種類無線擴大,並且同一種對象可能會被用戶實例化非常多次的行為。這裡可以對應我們的 ShareInformer。
從消息的可靠性出發的話:
剛剛說了這麼多,都是進行長連接去Watch的,萬一網絡出錯怎麼辦?這個時候我們的List機制就很明顯發揮作用,一旦感知跟API Server中斷,或者第一次啟動,都是使用List機制的, List作為一個短連接去獲取資源信息,Watch 作為長連接去持續接收資源的變更並且處理。(用List&Watch可以保證不會漏掉任何事件)
Watch的實現Watch是通過HTTP 長連接接收API Server發送的資源變更事件,使用的 Chunkerdtransfer coding, 代碼位置 ./staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go,源碼如下
e := streaming.NewEncoder(framer, s.Encoder)
// ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() defer cleanup()
// begin the stream w.Header().Set("Content-Type", s.MediaType) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush()我們使用通過 curl來看看, 在 response的 Header中設置 Transfer-Encoding的值是 chunkerd
# curl -i http://127.0.0.1:8001/api/v1/watch/namespaces?watch=yesHTTP/1.1 200 OKCache-Control: no-cache, privateContent-Type: application/jsonDate: Sun, 09 Aug 2020 02:44:07 GMTTransfer-Encoding: chunked
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"...監聽事件 Reflector我的理解,Reflector是實現對指定的類型對象的監控,既包括Kubernetes內置資源,也可以是CRD自定義資源。
數據結構我們來看看Reflector的數據結構, 代碼塊 staging/src/k8s.io/client-go/tools/cache/reflector.go
listerWatcher其實就是從API Server裡面去做List跟Watch的操作去獲取對象的變更。
type Reflector struct { name string // 監控的對象類型,比如Pod expectedType reflect.Type // 存儲 store Store // ListerWatcher是針對某一類對象,比如Pod listerWatcher ListerWatcher period time.Duration resyncPeriod time.Duration ShouldResync func() bool ...}RunRun是循環一直把數據存儲到 DeltaFIFO中。
func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh)}也就是說,Reflector是一直在執行ListAndWatch, 除非收到消息stopCh要被關閉,Run才會退出。
ListAndWatch書上把這一段講得很詳細了,我貼這段代碼,是為了給下面的Kubernetes並發的章節用的,這裡用到了 GetResourceVersion setLastSyncResourceVersion等
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List() // to be served from cache and potentially be delayed relative to // etcd contents. Reflector framework will catch up via Watch() eventually. options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error { initTrace := trace.New("Reflector " + r.name + " ListAndWatch") defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() // 先List list, err = r.listerWatcher.List(options) close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } initTrace.Step("Objects extracted") if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err }
resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }()
for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: return nil default: }
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, }
w, err := r.listerWatcher.Watch(options) if err != nil { switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case wait and resend watch request. if urlError, ok := err.(*url.Error); ok { if opError, ok := urlError.Err.(*net.OpError); ok { if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { time.Sleep(time.Second) continue } } } return nil }
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } return nil } }}Kubernetes並發從ListAndWatch的代碼,有一段關於 syncWith的方法,比較重要,原來Kubernetes的並發是通過 ResourceVersion來實現的,每次對這個對象的改動,都會把改對象的 ResourceVersion加一。
二級緩存DeltaFIFO 和 StoreDeltaFIFO我們通過數據結構來理解DeltaFIFO,我們先來理解一下Delta。
代碼塊 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
通過下面的代碼塊,我們可以非常清晰看得出, Delta其實是一個資源對象存儲,保存例如Pod的Added操作等。用白話來說其實就是記錄Kubernetes每一個對象的變化。
type Delta struct { Type DeltaType Object interface{}}
type DeltaType string
const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Sync DeltaType = "Sync")FIFO就比較容易理解了,就是一個先進先出的隊列。也可以看看代碼塊 staging/src/k8s.io/client-go/tools/cache/fifo.go去看他的實現,如下
type Queue interface { Store // 可以看出來Queue是在Store的基礎上擴展了Pop,可以讓對象彈出。這裡如果對比一下Indexer的數據結構發現很有意思,Indexer是在Store的基礎上加了索引,去快速檢索對象 Pop(PopProcessFunc) (interface{}, error) AddIfNotPresent(interface{}) error HasSynced() bool Close()}結合起來,DeltaFIFO其實就是一個先進先出的Kubernetes對象變化的隊列,這個隊列中存儲不同操作類型的同一個資源對象。
DeltaFIFO中的GET方法或者GetByKey都比較簡單,接下來對queueActionLocked()函數重點說明。
queueActionLockedfunc (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { // 拿到對象的Key id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} }
// 把同一個對象的不同的actionType,都添加到newDeltas列表中 newDeltas := append(f.items[id], Delta{actionType, obj}) // 合併去重 newDeltas = dedupDeltas(newDeltas) // 我一開始理解不了,覺得不可能存在<=0的情況,最新的Kubernetes的代碼裡面注釋說了,正常情況下不會出現<=0, 加這個判斷屬於冗餘判斷 if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { delete(f.items, id) } return nil}看看去重的代碼
func dedupDeltas(deltas Deltas) Deltas { n := len(deltas) // 少於2個也就是得一個,不需要合併了,直接返回 if n < 2 { return deltas } a := &deltas[n-1] b := &deltas[n-2] // 這裡,最後調了isDeletionDup,這個是判斷一個資源對象的兩次操作是否都是刪除,如果是,就去重,不需要刪除兩次 if out := isDup(a, b); out != nil { d := append(Deltas{}, deltas[:n-2]...) return append(d, *out) } return deltas}
func isDup(a, b *Delta) *Delta { if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil}之前群裡有人問為什麼dedupDeltas只是去這個列表的倒數一個跟倒數第二個去進行合併去重的操作,這裡說明一下,dedupDeltas是被queueActionLocked函數調用的,而queueActionLocked為什麼我們拿出來講,是因為在Delete/Update/Add裡面去調用了queueActionLocked,合併是對某一個obj的一系列操作,而去重是只針對delete。
我們可以拿一個例子來看看,假設是[obj1]: [add: delta1, update: delta2, delete: delta3, delete: delta3] 在經過queueActionLocked之後會變成[obj1]: [add: delta1, update: delta2, delete: delta3]
消費者方法func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { if f.IsClosed() { return nil, FIFOClosedError }
f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { continue } delete(f.items, id) err := process(item)
if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } return item, err }}LocalStore緩存機制,但LocalStore是被 Lister的 List/Get方法訪問
Share Informer 共享機制從流程上我們說了,因為是 DeltaFIFO把消息分發至 ShareInformer中,因此我們可以用 Inforomer添加自定義的回調函數,也就是我們經常看到的 OnAdd OnUpdaate和 OnDelete
Kubernetes內部的每一個資源都實現了Informer機制,如下是一個Namespace的Informer的例子
代碼塊 staging/src/k8s.io/client-go/informers/core/v1/namespace.go
type NamespaceInformer interface { Informer() cache.SharedIndexInformer Lister() v1.NamespaceLister}Indexer以下是Indexer的數據結構,清晰的看見Indexer繼承了Store接口, 還增加了索引的功能。
type Indexer interface { Store Index(indexName string, obj interface{}) ([]interface{}, error)...}看看我們流程第四個步驟:DeltaFIFO是一個先進先出隊列,只要這個隊列有數據,就被Pop到Controller中, 將這個資源對象存儲至 Indexer中。這個步驟說明了Indexer存儲的數據來源。
我們看看Indexer關鍵的幾個索引函數
// 索引函數,傳入的是對象,返回的是檢索結果的列表,例如我們可以通過IndexFunc去查某個Annotation/label的configmaptype IndexFunc func(obj interface{}) ([]string, error)// 索引函數,key是索引器名詞,value是索引器的實現函數type Indexers map[string]IndexFunc // 索引函數name 對應多個索引鍵 多個對象鍵 真正對象type Indices map[string]Index// 索引緩存,map類型type Index map[string]sets.String總結一下:
Indexers: 索引函數name --> 索引實現函數-->索引key值Indics: 索引函數name --> 對應多個索引key值 --> 每個索引key值對應不同的資源
舉個例子來說明的話:對象Pod有一個標籤app=version1,這裡標籤就是索引鍵,Indexer會把相同標籤的所有Pod放在一個集合裡面,然後我們實現對標籤分類就是我們Indexer的核心內容。
參考《Kubernetes 源碼剖析》第五章
推薦閱讀