Kubernetes Informer 機制源碼解析

2021-02-16 Cloud Native Community

劉淑娟,愛立信廣州工程師,雲原生愛好者。

這篇文章來源於雲原生社區組織的 Kubernetes 源碼研習社的作業,是個人學習Informer機制、理解Informer各個組件的設計的總結。

背景

為什麼Kubernetes需要Informer機制?我們知道Kubernetes各個組件都是通過REST API跟API Server交互通信的,而如果每次每一個組件都直接跟API Server交互去讀取/寫入到後端的etcd的話,會對API Server以及etcd造成非常大的負擔。而Informer機制是為了保證各個組件之間通信的實時性、可靠,並且減緩對API Server和etcd的負擔。

Informer 流程

這個流程,建議先看看《From Controller Study Informer》

這裡我們以CoreV1. Pod資源為例子:

第一次啟動Informer的時候,Reflector 會使用 List從API Server主動獲取CoreV1. Pod的所有資源對象信息,通過 resync將資源存放在 Store中

持續使用 Reflector建立長連接,去 Watch API Server發來的資源變更事件

當2 監控到CoreV1.Pod的資源對象有增加刪除修改之後,就把資源對象存放在 DeltaFIFO中,

DeltaFIFO是一個先進先出隊列,只要這個隊列有數據,就被Pop到Controller中, 將這個資源對象存儲至 Indexer中,並且將該資源對象分發至 ShareInformer

Controller會觸發 Process回調函數

打臉

所以,我自己之前寫代碼的時候,一直以為是 ShareInformer去主動watch API Server, 而現在正打臉了,是 Reflector做的List&Watch。

ListAndWatch 思考

為什麼Kubernetes裡面是使用ListAndWathc呢?我們所知道的其他分布式系統常常使用RPC來觸發行為。

我們來分析下如果不這樣做,而是採用API Server輪詢推送消息給各個組件,或者各個組件輪詢去訪問API Server的話,那麼實時性就得不到保證,並且對API Server造成很大的負載,很有可能需要開啟大量的埠造成埠浪費。

從實時性出發的話:

我們希望是有任何資源的新增/改動/刪除,都需要馬上獲取並且放入消息隊列。可以對應我們Informer中的 Reflector組件,去主動獲取消息,並且放入 DeltaFIFO隊列被消費。

從減輕負載出發的話:

需要上緩存,這裡可以對應我們的 Store組件。

從設計擴展性出發的話:

作為一個「資源管理系統」的Kubernetes,我們的對象數量可能會無線擴大,那麼我們需要設計一個高效擴展的組件,去應對對象的種類無線擴大,並且同一種對象可能會被用戶實例化非常多次的行為。這裡可以對應我們的 ShareInformer。

從消息的可靠性出發的話:

剛剛說了這麼多,都是進行長連接去Watch的,萬一網絡出錯怎麼辦?這個時候我們的List機制就很明顯發揮作用,一旦感知跟API Server中斷,或者第一次啟動,都是使用List機制的, List作為一個短連接去獲取資源信息,Watch 作為長連接去持續接收資源的變更並且處理。(用List&Watch可以保證不會漏掉任何事件)

Watch的實現

Watch是通過HTTP 長連接接收API Server發送的資源變更事件,使用的 Chunkerdtransfer coding, 代碼位置 ./staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go,源碼如下

 e := streaming.NewEncoder(framer, s.Encoder)
// ensure the connection times out timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() defer cleanup()
// begin the stream w.Header().Set("Content-Type", s.MediaType) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush()

我們使用通過 curl來看看, 在 response的 Header中設置 Transfer-Encoding的值是 chunkerd

# curl -i http://127.0.0.1:8001/api/v1/watch/namespaces?watch=yesHTTP/1.1 200 OKCache-Control: no-cache, privateContent-Type: application/jsonDate: Sun, 09 Aug 2020 02:44:07 GMTTransfer-Encoding: chunked
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"...

監聽事件 Reflector

我的理解,Reflector是實現對指定的類型對象的監控,既包括Kubernetes內置資源,也可以是CRD自定義資源。

數據結構

我們來看看Reflector的數據結構, 代碼塊 staging/src/k8s.io/client-go/tools/cache/reflector.go

listerWatcher其實就是從API Server裡面去做List跟Watch的操作去獲取對象的變更。

type Reflector struct {  name string    // 監控的對象類型,比如Pod  expectedType reflect.Type    // 存儲  store Store    // ListerWatcher是針對某一類對象,比如Pod  listerWatcher ListerWatcher  period       time.Duration  resyncPeriod time.Duration  ShouldResync func() bool  ...}

Run

Run是循環一直把數據存儲到 DeltaFIFO中。

func (r *Reflector) Run(stopCh <-chan struct{}) {  klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)  wait.Until(func() {    if err := r.ListAndWatch(stopCh); err != nil {      utilruntime.HandleError(err)    }  }, r.period, stopCh)}

也就是說,Reflector是一直在執行ListAndWatch, 除非收到消息stopCh要被關閉,Run才會退出。

ListAndWatch

書上把這一段講得很詳細了,我貼這段代碼,是為了給下面的Kubernetes並發的章節用的,這裡用到了 GetResourceVersion setLastSyncResourceVersion等

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {  klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)  var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List() // to be served from cache and potentially be delayed relative to // etcd contents. Reflector framework will catch up via Watch() eventually. options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error { initTrace := trace.New("Reflector " + r.name + " ListAndWatch") defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() // 先List list, err = r.listerWatcher.List(options) close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) } initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } initTrace.Step("Objects extracted") if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err }
resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }()
for { // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors select { case <-stopCh: return nil default: }
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, }
w, err := r.listerWatcher.Watch(options) if err != nil { switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) } // If this is "connection refused" error, it means that most likely apiserver is not responsive. // It doesn't make sense to re-list all objects because most likely we will be able to restart // watch where we ended. // If that's the case wait and resend watch request. if urlError, ok := err.(*url.Error); ok { if opError, ok := urlError.Err.(*net.OpError); ok { if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { time.Sleep(time.Second) continue } } } return nil }
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) } return nil } }}

Kubernetes並發

從ListAndWatch的代碼,有一段關於 syncWith的方法,比較重要,原來Kubernetes的並發是通過 ResourceVersion來實現的,每次對這個對象的改動,都會把改對象的 ResourceVersion加一。

二級緩存DeltaFIFO 和 StoreDeltaFIFO

我們通過數據結構來理解DeltaFIFO,我們先來理解一下Delta。

代碼塊 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go

通過下面的代碼塊,我們可以非常清晰看得出, Delta其實是一個資源對象存儲,保存例如Pod的Added操作等。用白話來說其實就是記錄Kubernetes每一個對象的變化。

type Delta struct {  Type   DeltaType  Object interface{}}
type DeltaType string
const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Sync DeltaType = "Sync")

FIFO就比較容易理解了,就是一個先進先出的隊列。也可以看看代碼塊 staging/src/k8s.io/client-go/tools/cache/fifo.go去看他的實現,如下

type Queue interface {  Store    // 可以看出來Queue是在Store的基礎上擴展了Pop,可以讓對象彈出。這裡如果對比一下Indexer的數據結構發現很有意思,Indexer是在Store的基礎上加了索引,去快速檢索對象  Pop(PopProcessFunc) (interface{}, error)  AddIfNotPresent(interface{}) error  HasSynced() bool  Close()}

結合起來,DeltaFIFO其實就是一個先進先出的Kubernetes對象變化的隊列,這個隊列中存儲不同操作類型的同一個資源對象。

DeltaFIFO中的GET方法或者GetByKey都比較簡單,接下來對queueActionLocked()函數重點說明。

queueActionLocked
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {    // 拿到對象的Key  id, err := f.KeyOf(obj)  if err != nil {    return KeyError{obj, err}  }
// 把同一個對象的不同的actionType,都添加到newDeltas列表中 newDeltas := append(f.items[id], Delta{actionType, obj}) // 合併去重 newDeltas = dedupDeltas(newDeltas) // 我一開始理解不了,覺得不可能存在<=0的情況,最新的Kubernetes的代碼裡面注釋說了,正常情況下不會出現<=0, 加這個判斷屬於冗餘判斷 if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { delete(f.items, id) } return nil}

看看去重的代碼

func dedupDeltas(deltas Deltas) Deltas {  n := len(deltas)    // 少於2個也就是得一個,不需要合併了,直接返回  if n < 2 {    return deltas  }  a := &deltas[n-1]  b := &deltas[n-2]    // 這裡,最後調了isDeletionDup,這個是判斷一個資源對象的兩次操作是否都是刪除,如果是,就去重,不需要刪除兩次  if out := isDup(a, b); out != nil {    d := append(Deltas{}, deltas[:n-2]...)    return append(d, *out)  }  return deltas}
func isDup(a, b *Delta) *Delta { if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil}

之前群裡有人問為什麼dedupDeltas只是去這個列表的倒數一個跟倒數第二個去進行合併去重的操作,這裡說明一下,dedupDeltas是被queueActionLocked函數調用的,而queueActionLocked為什麼我們拿出來講,是因為在Delete/Update/Add裡面去調用了queueActionLocked,合併是對某一個obj的一系列操作,而去重是只針對delete。

我們可以拿一個例子來看看,假設是[obj1]: [add: delta1, update: delta2, delete: delta3, delete: delta3] 在經過queueActionLocked之後會變成[obj1]: [add: delta1, update: delta2, delete: delta3]

消費者方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {  f.lock.Lock()  defer f.lock.Unlock()  for {    for len(f.queue) == 0 {            if f.IsClosed() {        return nil, FIFOClosedError      }
f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { continue } delete(f.items, id) err := process(item)
if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } return item, err }}

LocalStore

緩存機制,但LocalStore是被 Lister的 List/Get方法訪問

Share Informer 共享機制

從流程上我們說了,因為是 DeltaFIFO把消息分發至 ShareInformer中,因此我們可以用 Inforomer添加自定義的回調函數,也就是我們經常看到的 OnAdd OnUpdaate和 OnDelete

Kubernetes內部的每一個資源都實現了Informer機制,如下是一個Namespace的Informer的例子

代碼塊 staging/src/k8s.io/client-go/informers/core/v1/namespace.go

type NamespaceInformer interface {  Informer() cache.SharedIndexInformer  Lister() v1.NamespaceLister}

Indexer

以下是Indexer的數據結構,清晰的看見Indexer繼承了Store接口, 還增加了索引的功能。

type Indexer interface {  Store  Index(indexName string, obj interface{}) ([]interface{}, error)...}

看看我們流程第四個步驟:DeltaFIFO是一個先進先出隊列,只要這個隊列有數據,就被Pop到Controller中, 將這個資源對象存儲至 Indexer中。這個步驟說明了Indexer存儲的數據來源。

我們看看Indexer關鍵的幾個索引函數

// 索引函數,傳入的是對象,返回的是檢索結果的列表,例如我們可以通過IndexFunc去查某個Annotation/label的configmaptype IndexFunc func(obj interface{}) ([]string, error)// 索引函數,key是索引器名詞,value是索引器的實現函數type Indexers map[string]IndexFunc // 索引函數name   對應多個索引鍵   多個對象鍵   真正對象type Indices map[string]Index// 索引緩存,map類型type Index map[string]sets.String

總結一下:

Indexers: 索引函數name --> 索引實現函數-->索引key值Indics: 索引函數name --> 對應多個索引key值 --> 每個索引key值對應不同的資源

舉個例子來說明的話:對象Pod有一個標籤app=version1,這裡標籤就是索引鍵,Indexer會把相同標籤的所有Pod放在一個集合裡面,然後我們實現對標籤分類就是我們Indexer的核心內容。

參考

《Kubernetes 源碼剖析》第五章

推薦閱讀


相關焦點

  • Kubernetes 學習筆記之 ServiceAccount TokensController 源碼解析
    在 Kubernetes 學習筆記之 ServiceAccount AdmissionController 源碼解析 文章中,知道一個 ServiceAccount 對象都會引用一個type="kubernetes.io/service-account-token" 的 secret 對象,這個 secret
  • client-go 源碼學習總結
    前言目前在雲原生社區的 Kubernetes 源碼研習社中和廣大學友們共同學習鄭東旭大佬的 Kubernetes 源碼剖析[1]這本書。當前正在開展第一期學習活動,第五章節 client-go 的學習。之所以從這一章節開始學習,主要是考慮到 client-go 在源碼中相對比較獨立,可以單獨閱讀。
  • 深入解析Kubernetes service 概念
    深入解析Kubernetes service 概念Kubernetes在Kubernetes平臺上,Pod是有生命周期,為了可以給客戶端一個固定的訪問端點,因此需要在客戶端和Pod之間添加一個中間層,這個中間層稱之為ServiceService是什麼?
  • Kubernetes CRD 自定義控制器
    go func() {  <-c  close(stop)  <-c  os.Exit(1) // 第二個信號,直接退出 }() return stop}func initClient() (*kubernetes.Clientset, *rest.Config, error) { var
  • Kubernetes學習筆記之LRU算法源碼解析
    Kubernetes學習筆記之LRU算法源碼解析Overview本文章基於k8s release-1.17分支代碼。之前一篇文章學習 Kubernetes學習筆記之ServiceAccount TokensController源碼解析 ,主要學習ServiceAccount有關知識,發現其中使用了LRU Cache,代碼在 L106 。
  • Kubernetes API 安全機制詳解
    運維中對平臺的管理操作都需要通過apiserver提供的功能接口完成,因此Kubernetes總體的API安全防護機制是對用戶訪問操作HTTPS服務接口的控制。JWT的內容如下:Header:{"alg":"RS256","kid":""}Payload:{ "iss": "kubernetes/serviceaccount", "kubernetes.io/serviceaccount/namespace": "default", "kubernetes.io/serviceaccount/secret.name": "test-token-z9tzx
  • CVE-2020-8554:MiTM漏洞影響所有Kubernetes版本
    Kubernetes (簡稱K8s)是是一個開源的,用於管理雲平臺中多個主機上的容器化的應用,Kubernetes的目標是讓部署容器化的應用簡單並且高效,Kubernetes提供了應用部署、規劃、更新、維護的一種機制。
  • 源碼視角,全方位學習Kubernetes scheduler
    12、VolumeNodePredicate> 無13、VolumeZonePredicate> 檢查存儲區域劃分:檢查Node中是否有label:failure-domain.beta.kubernetes.io/zone或者failure-domain.beta.kubernetes.io
  • Kubernetes ELK 日誌收集
    直接在宿主機上安裝,和在kubernetes效果一樣的。**> # 匹配tag為raw.kubernetes.**> @id filter_parser @type parser # multi-format-parser多格式解析器插件 key_name log # 在要解析的記錄中指定欄位名稱。
  • 抖音解析網站源碼 抖音解析保存在哪裡
    抖音解析網站源碼有沒有呢,這是很多有這方面需求的小夥伴們都關心的問題。就讓小編帶大家了解抖音解析保存在哪裡吧~
  • Jetpack源碼解析--ViewModel基本使用及源碼解析
    1.背景Jetpack源碼解析系列文章:1. Android_Jetpack組件---Naviagtion源碼解析2. Jetpack源碼解析—Navigation為什麼切換Fragment會重繪?3. Jetpack源碼解析---用Lifecycles管理生命周期4.
  • 強大的反射功能詳解與應用源碼解析
    JAVA反射機制主要提供了以下功能:於是,我們可以先通過反射獲取對象的類,從而判斷兩個對象是否屬於同一個類;然後獲取對象的成員變量,輪番比較兩個對象的成員變量是否一致。最終,我們將功能改寫為如下所示的形式。
  • Kubernetes-應用部署問題定位和處理
    /kubernetes/issues/6842pods/mypod接下來,要檢查的是apiserver上的Pod是否與要創建的Pod相匹配。3.2 能否通過DNS解析正常解析代理服務對於處於同一個命名空間的容器化應用,可以直接通過代理服務的名稱(mysql-0-svc)訪問MySQL master。
  • Spring-Task源碼解析
    this.registerBeanDefinitionParser("scheduler",newSchedulerBeanDefinitionParser());}schedulerSchedulerBeanDefinitionParser源碼
  • Kubernetes的Local Persistent Volumes使用小記
    歡迎訪問我的GitHubhttps://github.com/zq2599/blog_demos內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;和常見的分布式文件系統相比,本地磁碟故障會導致數據丟失,保存重要數據請勿使用HostPath Volume和Local PV;基本概念說完了,接下來實戰體驗;實戰環境信息作業系統:CentOS Linux release 7.8.2003 (Core)kubernetes
  • 一篇讀懂Kubernetes Scheduler擴展功能
    :註冊預選函數(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go);註冊優選函數(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go)編寫預選和優選處理函數:編寫預選函數(k8s.io
  • 阿里P8架構師力薦的 Java源碼解析及面試合集
    源碼解析和設計思路06 LinkedList 源碼解析07 List 源碼會問哪些面試題08 HasMap源碼解析>09 [x]TreeMap 和 LinkedHashMap核心源碼解析10 Map源碼會問哪些面i試題11 [X]HashSet 、TreeSet 源碼解析
  • Kubernetes持續部署指南
    運行和暴露內部埠4567以在本地啟動伺服器:$ docker run -p 4567:4567 test-image你現在可以測試一個可用的HTTP端點:$ curl -w "\n" localhost:4567hello world :))Semaphore有一個安全的機制以存儲敏感信息
  • Kubernetes scheduler學習筆記
    為了能更好的使用它,所以從源碼的角度,對它進行一個全方位的分析與學習。scheduler的功能不多,但邏輯比較複雜,裡面有很多考慮的因素,總結下來大致有如下幾點:Leader選主,確保集群中只有一個scheduler在工作,其它只是高可用備份實例。通過endpoint:kube-scheduler作為仲裁資源。
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    目前知識星球內已更新的系列文章:1、Flink 源碼解析 —— 源碼編譯運行2、Flink 源碼解析 —— 項目結構一覽3、Flink 源碼解析—— local 模式啟動流程4、Flink 源碼解析 —— standalonesession 模式啟動流程5、Flink 源碼解析 —— Standalone Session