上一篇講到reflector監控指定的k8s資源,當監控的資源發生變化時,將資源對象的變化存放到DeltaFIFO隊列中。本篇的內容就是剖析DeltaFIFO隊列,順便再看下goland如何實現FIFO隊列。
隊列client-go中有兩個隊列,一個是FIFO隊列,另一個是DeltaFIFO隊列。我們通過學習其中的FIFO隊列來了解Golang語言中設計FIFO隊列的基本技巧,而學習DeltaFIFO隊列是深入理解Inform機制所需,為後面的文章打下基礎。看下client-go中是如何設計FIFO隊列的,首先有2個Interface接口叫Store和Queue,Queue中包含了Store,而DeltaFIFO和FIFO都是queue接口的實現,Store接口的定義如下:
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error // 用於在List-watch機制中,從api-server那邊list完後,要將對象數據放入DeltaFIFO隊列
Resync() error // 用於定時同步,以免數據不一致
}
Queue接口如下:
type Queue interface {
Store
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
Close()
}
FIFO,大家學過計算機課程的都應該知道,是 First In, First Out的縮寫,意為先入先出,這個通常是隊列的主要特性。看下client-go中FIFO的結構體實現:
源碼摘自:client-go/tools/cache/fifo.go
type FIFO struct { // store 接口的實現
lock sync.RWMutex // 讀寫鎖 針對整個對象
cond sync.Cond // 條件變量
items map[string]interface{} // 存儲key到元素對象的Map
queue []string // 隊列索引,是個數組 保證有序
// 如果已經填充了Replace()插入的第一批項目,或者首先調用了Delete / Add / Update,則populated為true。
populated bool
// 是第一次調用Replace()插入的項目數
initialPopulationCount int
keyFunc KeyFunc //keyFunc像是個對對象的hash函數,獲取對象Id
closed bool // 隊列是否關閉
}
該隊列該如何初始化:
func NewFIFO(keyFunc KeyFunc) *FIFO { // 新建FIFO隊列時,只需傳入keyFunc就行了,keyFunc就是對象的Hash函數,計算對象唯一的對象鍵用的
f := &FIFO{
items: map[string]interface{}{},
queue: []string{},
keyFunc: keyFunc,
}
f.cond.L = &f.lock
return f
}
下面我們看下隊列的增刪改以及pop數據等核心操作的源碼。1、插入元素
func (f *FIFO) Add(obj interface{}) error {
id, err := f.keyFunc(obj) // 拿到對象ID
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock() // 加鎖
defer f.lock.Unlock()
f.populated = true // 設置標誌位
if _, exists := f.items[id]; !exists { // 判斷是否已存在
f.queue = append(f.queue, id) // 不存在,就放入queue數組的最後
}
f.items[id] = obj // 放入Map.,萬一是重複的就是直接替換了
f.cond.Broadcast() // 廣播元素入隊了,等在在pop操作的協程可以去元素了
return nil
}
2、更新操作 就是使用了上面的Add方法
func (f *FIFO) Update(obj interface{}) error {
return f.Add(obj)
}
3、刪除操作
func (f *FIFO) Delete(obj interface{}) error {
id, err := f.keyFunc(obj)// 獲取對象的Key
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock() // 加鎖
defer f.lock.Unlock()
f.populated = true
delete(f.items, id) // 直接從map中刪除元素,那數組中的索引怎麼辦,pop取元素的時候有額外處理
return err
}
4、獲取對象
獲取的是該對象的最新更改
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.keyFunc(obj) // 獲取對象Key
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)// 通過Key檢查對象存不存在隊列
}
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[key] // 從items中拿數據
return item, exists, nil
}
5、判斷隊列是否關閉
func (f *FIFO) IsClosed() bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.closed { // 檢查這個標誌位
return true
}
return false
}
6、Pop函數,隊列中取元素專有的函數
這邊取元素的同時傳入處理元素的函數process。
取出的對象是最新的。
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for { // 一個循環,只在取到元素或者隊列關閉時退出
for len(f.queue) == 0 {// 隊列為空時,就一直等待
if f.closed { // 隊列關閉,就退出循環
return nil, ErrFIFOClosed
}
f.cond.Wait() // 否則一直等待,直到廣播通知隊列有元素了;阻塞
}
id := f.queue[0] // 拿出隊列首位
f.queue = f.queue[1:] // 隊首元素出隊後修正有序數組
if f.initialPopulationCount > 0 {
f.initialPopulationCount-- // 隊列元素總數計數
}
item, ok := f.items[id]
if !ok {// 有可能已經被刪除了,請見delete 函數,之前被刪除的,就不管了
continue
}
delete(f.items, id) // 從Map中刪除
err := process(item) // 用傳進來處理函數process來處理出隊的元素,要是處理失敗,就再塞回隊列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
7、替換隊列元素 傳入參數是list和資源版本
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list)) // 初始化一個map充當之後的隊列
for _, item := range list { // 遍歷list
key, err := f.keyFunc(item) // 獲取對象的Key
if err != nil {
return KeyError{item, err}
}
items[key] = item // 放入items中
}
f.lock.Lock() // 獲取鎖
defer f.lock.Unlock()
if !f.populated { // 未進行replace/add/update等操作
f.populated = true
f.initialPopulationCount = len(items)
}
f.items = items // 替換隊列的所有元素
f.queue = f.queue[:0] // 刪除隊列的之前的排序
for id := range items {
f.queue = append(f.queue, id) // 重新錄入排序
}
if len(f.queue) > 0 {// 排序數組有數據
f.cond.Broadcast()// 廣播
}
return nil
}
8、重新同步 從代碼上看,f.items中的Key可能和f.queue中所包含的Key不一致,所以需要重新同步,讓兩者在key上保持一致。網上的說法是保證不丟事件、數據同步並能及時響應事件。個人看法,覺得這種同步機制是必要的,但是同步頻率需要把控好,不然會影響隊列的效率吧。
func (f *FIFO) Resync() error {
f.lock.Lock() // 獲取鎖
defer f.lock.Unlock()
inQueue := sets.NewString() // 初始化是個Map map[string]Empty
for _, id := range f.queue { // 遍歷索引數組
inQueue.Insert(id) // inQueue複製f.queue
}
for id := range f.items { // 遍歷隊列元素
if !inQueue.Has(id) { // items map中的可以在queue數組中不存在,就添加進去。
f.queue = append(f.queue, id) // 補足f.queue缺失的Id
}
}
if len(f.queue) > 0 {
f.cond.Broadcast() // 廣播
}
return nil
}
上面就基本講完了FIFO的實現。其實如果你項目中要自己實現FIFO,可以把這段抄進去直接使用,client-go都幫你驗證過了,實際使用問題不大的,但是注意一點,就是client-go中的FIFO隊列是針對對象的,重複對象添加是會覆蓋的。要是你的應用不需要這個特性,就需要改改了。
DeltaFIFO什麼是 DeltaFIFOFIFO的意思是先入先出,而Delta的意思是增量。合起來,DeltaFIFO可意為增量先入先出隊列,就是該隊列存儲的數據是增量數據。這邊補充下維基百科增量計算的概念:增量計算是一種軟體功能 。當一部分的數據產生了變化,就僅對該產生變化的部分進行計算和更新,以節省計算時間。相比於簡單地重複計算完整的輸出內容,增量計算能夠顯著地節省計算時間。比如,電子表格會在實現重計算功能時使用增量計算,只重新計算並更新那些含有公式且被直接或間接地改變了的單元格。我想這邊的增量隊列也是考慮到節省計算時間吧。那在client-go中什麼是增量數據,看下源碼中對於Delta的定義:源碼均摘自:client-go/tools/cache/delta_fifo.go
type Delta struct { // 記錄對於對象的增量操作
Type DeltaType // 增量類型
Object interface{} // 對象
}
type DeltaType string // 增量類型是個String
// 有哪些增量類型呢,增刪改,替換,和同步
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
所以所謂的DeltaFIFO就是一個裝有Delta類型和對象數據的先入先出隊列。看下DeltaFIFO結構體的屬性有哪些:
type DeltaFIFO struct {
lock sync.RWMutex // 讀寫鎖,方便讀操作的數據讀取,鎖粒度更細
cond sync.Cond // 條件變量,用於通知和阻塞
items map[string]Deltas //objectkey映射對象的增量數組
queue []string // 保證有序,裡面會放入ObjectKey.從隊列取數據時先從這個數組中拿key,再去items中拿對象
populated bool // 標記隊列是否add/update/delete/replace過了。用處不明
initialPopulationCount int // 第一次replace的元素數量,用處不明
keyFunc KeyFunc // 相當於Hash函數,從一個object中計算出唯一的key
knownObjects KeyListerGetter // knownObjects是新建隊列時傳進來的,並在delete, replace,resync中被使用。是Indexer,是本地存儲,就是list-watch後的對象數據要放入DeltaFIFO隊列中,reflector會將數據從隊列中取出並放入本地存儲Indexer中。之後要是用戶想獲取哪個對象,就直接從本地存儲Indexer中獲取就行了,不用專門去請求api-server了
closed bool // 標記該隊列是否關閉
emitDeltaTypeReplaced bool // Replace() 是否調用過的標記
}
在看完屬性後,看下是如何創建DeltaFIFO隊列的, 這邊提供了兩種方式,核心只用了NewDeltaFIFOWithOptions方法:
// 需要傳入類似哈希函數的KeyFunc和KeyListerGetter,KeyListerGetter是個Indexer本地存儲。後面的文章會講
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ // 調用了下面這個函數
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
// 開始封裝DeltaFIFO
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock // 設置條件變量
return f
}
整個DeltaFIFO隊列的方法有很多,我主要講幾個核心方法:1、添加操作
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock() // 獲取寫鎖
defer f.lock.Unlock() // 釋放寫鎖
f.populated = true // 設置標記位
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj) // 獲取對象的唯一Key
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj}) // 將新的對象增量操作放入Items中對象的增量數組中
newDeltas = dedupDeltas(newDeltas) // 返回修正後的增量數組,數組中的最後兩個增量操作可能時一樣的,這邊需要刪除重複的一個,一般重複的操作都是刪除操作
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id) // 入隊
}
f.items[id] = newDeltas // 放好map
f.cond.Broadcast() // 廣播通知,可能有協程在等待隊列的元素,所以這邊需要廣播通知
} else { // 一般不會發生這種情況
delete(f.items, id) // 刪除該Key
}
return nil
}
2、更新操作
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock() // 上寫鎖
defer f.lock.Unlock() // 解鎖
f.populated = true
return f.queueActionLocked(Updated, obj)
}
這邊的流程和添加操作是一樣的,唯一的不同就是傳入的操作類型是Updated 3、刪除操作 基本邏輯:查看本地存儲和隊列中是否存在該對象,不存在就不繼續刪除操作了,存在,那就添加刪除的增量操作。
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj) // 獲取object的唯一key
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock() // 上寫鎖
defer f.lock.Unlock() // 釋放寫鎖
f.populated = true // 這是標記位
if f.knownObjects == nil { // 本地存儲為空
if _, exists := f.items[id]; !exists {
return nil
}
} else { // 本地存儲不為空
_, exists, err := f.knownObjects.GetByKey(id) // 從本地存儲中查看對象是否存在
_, itemsExist := f.items[id] // 隊列中對象是否存在
if err == nil && !exists && !itemsExist { // 本地存儲不存在和隊列中也不存在
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
return nil
}
}
// exist in items and/or KnownObjects
return f.queueActionLocked(Deleted, obj) // 這個之前講過了,加入刪除的增量操作
}
4、只添加不存在的添加操作
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
deltas, ok := obj.(Deltas) // 轉換成delta
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
id, err := f.KeyOf(deltas.Newest().Object) // 獲取Key
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
if _, exists := f.items[id]; exists { // 對象id只要存在就不添加
return
}
f.queue = append(f.queue, id)
f.items[id] = deltas // 只添加之前未添加過的Key的對象
f.cond.Broadcast()
}
5、 從隊列中取出元素 Pop 輸入參數是PopProcessFunc函數,這個設計挺棒的,就是把處理元素的邏輯帶進來,其他代碼可以完全復用。這個Pop函數的使用是在SharedInformer ,後面的文章會講到。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock() // 這邊嘗試獲取鎖
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed { // 隊列關閉的話,就退出
return nil, ErrFIFOClosed
}
f.cond.Wait() // 沒有數據就一直等待
}
id := f.queue[0] // 獲取隊首元素的key
f.queue = f.queue[1:] // 修正有序隊列
if f.initialPopulationCount > 0 {
f.initialPopulationCount-- // 減1
}
item, ok := f.items[id] // 獲取對象的增量數組,該對象所有的改變都在這了
if !ok { // 如果queue數組中的不存在於items中,說明該對象已經被刪除了
// Item may have been deleted subsequently.
continue
}
delete(f.items, id) // 刪除該id
err := process(item) // 這個設計技巧挺贊的,處理函數是動態傳入的,方便解耦。
if e, ok := err.(ErrRequeue); ok {// 隊列錯誤,就把元素再塞回去?!
f.addIfNotPresent(id, item) // 再塞回隊列
err = e.Err
}
return item, err
}
}
6、替換操作 一般是在List-watch中的list後被使用的,將獲取的數據存入隊列。
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock() // 上鎖
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)// 獲取Key
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
if f.knownObjects == nil { // 這層邏輯和下面邏輯的區別是什麼?
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items { // 遍歷隊列裡的元素
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
// 相當於一層補充機制,萬一有對象已刪除,但是沒監控到,要做好同步
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// 檢查,要是還有對象未刪除,就刪除她
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
7、Resync操作 會被周期性調用,檢查本地存儲和隊列中的數據是否一致
func (f *DeltaFIFO) Resync() error {
f.lock.Lock() // 獲取寫鎖了
defer f.lock.Unlock()
if f.knownObjects == nil { // 本地存儲為空,就退出
return nil
}
// 重新同步一次 Indexer 緩存數據到 Delta FIFO 隊列中
keys := f.knownObjects.ListKeys() // 獲取本地存儲的key
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key) // 從本地存儲拿對象
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists { // 本地存儲沒有該對象
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil // 退出
}
// 如果我們正在執行Resync(),並且已經有一個事件在排隊等待該對象,那麼我們將忽略該對象的Resync。
// 這是為了避免競爭,即重新同步帶有object的先前值(因為將對象的事件排隊不會觸發更改底層store的
// 競爭)。
// key存在才進行的邏輯
id, err := f.KeyOf(obj) // 又獲取key是什麼意思,上面不是有Key了嘛,不理解,難道是因為本地存儲的key值和這邊的Key值計算方式一樣?不過這樣做只是為了確保使用了正確的key。
if err != nil {
return KeyError{obj, err}
}
// key存在
if len(f.items[id]) > 0 { // 該key的增量記錄不為0,就不需要更新了
return nil // 退出
}
// 該key的增量記錄為空才做同步操作
if err := f.queueActionLocked(Sync, obj); err != nil { // 放入的增量類型是Sync
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
本篇主要講了client-go中FIFO隊列的實現和DeltaFIFO隊列的實現,相信大家對於如何實現FIFO已經有了了解,對於DeltaFIFO隊列,在這看到的只是隊列相關的操作,跟其他模塊的互動比較少,可以看到的是DeltaFIFO隊列用到了本地存儲Indexer(對應代碼中的knownObjects),可以從本地存儲indexer中查數據,但是未涉及knownObjects的寫入。我這裡說下我自己對於deltafifo隊列的理解,DeltaFIFO隊列的作用到底是什麼,直接使用FIFO隊列有什麼不好的地方嗎?我們可以看到對於FIFO隊列items中只存儲對象的最新信息,而過程信息是沒有的。反而DeltaFIFO隊列會完美的保存對象變化的全過程信息,對於需要時刻感知變化過程和變化操作的應用場景,這種DeltaFIFO更合適。後面的文章將介紹本地存儲Indexer,敬請期待。