導語 | Kafka作為一款性能優秀的消息隊列,主要用於異步、削峰、解耦處理,在分布式事務中有著廣泛地應用,但仍有很多開發者在運用過程中存在疑惑。文本將為大家由淺入深剖析Kafka基礎原理以及它的消息可靠性策略,幫助大家理解這一技術知識。文章作者:張璇,騰訊應用開發工程師。
一、背景
部門的開發同學最近在開發一個活動的過程中,需要關注大量的應用後臺邏輯,捕捉各種事件的觸發。在設計時打算採用Kafka消息隊列進行業務邏輯的解耦,這樣活動開發和後臺開發同學的工作就分離開了。但是使用的同學不是很熟悉其原理,擔心以下幾個問題:
我什麼業務場景下使用消息隊列?我發消息的時候,需要等ack嗎?我發了消息之後,消費者一定會收到嗎?申請騰訊雲的Kafka實例後,各種參數怎麼設置呀?遇到各種故障時,我的消息會不會丟?消費者側會收到多條消息嗎?消費者svr重啟後消息會丟失嗎?這些問題都很正常,在開始接觸和使用時總會有這樣或那樣的問題。 一般情況下,不做了解,使用各種默認的推薦值,也是可以work的。 但是我們要優雅的提升自己的姿(知)勢(識)。 學習其背後的原理,至少在遇到一般的問題時,能夠分析和處理問題,做到心中有數。
二、什麼時候使用消息隊列
簡單來說,有3個關鍵詞, 異步、削峰、解耦。 可以理解為:
我做完了,後面的我不管了;工作太多了,先放一放我慢慢處理;怎麼產生的我不管/怎麼處理我不管。以下圖為例:
用戶提交評論中, 寫入資料庫後,存在需要捕捉評論事件的多個邏輯步驟。 如果在接口處理過程中,順序的處理不同的步驟,非常繁瑣。
我們可以批量地通知各個步驟(異步),無需返回直接處理當次的支付其他邏輯(解耦)。 看起來就清爽多了。另外,消息隊列也可以作為緩存暫存發出的消息,不再需要考慮調用各個步驟時時延邏輯的異常場景。
附註:本文以講解Kafka中的可靠性設計為例,其它消息隊列的選型暫不涉及。
三、Kafka基本概念
在回答文章前面的問題之前,需要簡單介紹一下各種概念。 Kafka從拓撲上分有如下角色:
Consumer : 消費者,一般以API形式存在於各個業務svr中;Producer : 生產者,一般以API形式存在於各個業務svr中;Kafka broker : Kafka集群中的伺服器,topic裡的消息數據存在上面。
Producer採用發送push的方式將消息發到broker上,broker存儲後。由consumer採用pull模式訂閱並消費消息。
如圖所示,Kafka從存儲結構上,有如下角色:
Topic:Kafka處理的消息的邏輯大類集合,可以理解為表。寫入不同的topic即寫入不同的表。Partition: Topic下的物理分組,1個topic可以分為多個partition, 每個partition是一個有序的隊列(大文件)。Partition中每一條消息都有一個有序的offset。Msg: 消息,通信的基本單位。每個msg在topic下的不同partiton僅有一份,在partition中有一個唯一的offset用於定位。Replica: 副本,partition的數據冗餘備份,用於實現分布式的數據可靠性,但引入了不同副本間的數據一致性問題,帶來了一定的複雜度。Leader/follower: replica的角色,leader replica 用來提供該partition的讀寫服務。Follower 不停地從leader側同步寫入的消息。它們之間的消息狀態採用一致性策略來解決。四、Kakfa 的存儲格式
為了方便後文更好的理解broker上的消息狀態一致性策略,需要再簡單介紹一下消息的存儲格式。
當Producer 發送一條消息到broker中, 會根據分配 partition 規則選擇被存儲到哪一個 partition, 如果 partition 規則設置的合理,消息會均勻地分布到不同的 partition 裡,這樣就實現了水平擴展。
Pruducer可以認為partition是一個大的串行文件,msg存儲時被分配一個唯一的offset。Offset是一個邏輯意義上的偏移,用於區分每一條消息。
而partition本身作為文件,可以有多個多個副本replica(leader/follower)。多個replica分布在在不同的broker上。
如果要回答如何在broker之間保證存儲的消息和狀態不會丟失,就要回答broker之間的各個replica的消息狀態一致性如何解決,包括producer已經提交了哪些消息,哪些消息已經落地,哪些消息在節點故障後不會丟失。
五、異步發送時的消息可靠性保證
回到文章開頭提到的幾個問題,在使用Kafka消息隊列做異步發送時,如何保證消息的可靠性? 這 裡可以分為3個部分講解。
1. 生產者的可靠性保證
回答生產者的可靠性保證,即回答:
發消息之後有沒有ack?發消息收到ack後,是不是消息就不會丟失了?而Kafka通過配置來指定producer生產者在發送消息時的ack策略:
Request.required.acks=-1 (全量同步確認,強可靠性保證);Request.required.acks = 1(leader 確認收到, 默認);Request.required.acks = 0 (不確認,但是吞吐量大)。如果想實現Kafka配置為 CP(Consistency & Partition tolerance) 系統, 配置需要如下:
request.required.acks=-1min.insync.replicas = ${N/2 + 1}unclean.leader.election.enable = false
如圖所示,在acks=-1 的情況下,新消息只有被ISR中的所有 follower(f1和f2, f3) 都從leader複製過去才會回ack, ack後,無論哪種機器故障情況(全部或部分), 寫入的msg4,都不會丟失, 消息狀態滿足一致性C 要求。
正常情況下,所有follower複製完成後,leader回producer ack。
異常情況下,如果當數據發送到 leader後部分副本(f1和f2同步), leader掛了?此時任何 follower 都有可能變成新的 leader, producer 端會得到返回異常,producer 端會重新發送數據,但這樣數據可能會重複(但不會丟失), 暫不考慮數據重複的情況。
min.insync.replicas 參數用於保證當前集群中處於正常同步狀態的副本follower數量,當實際值小於配置值時,集群停止服務。如果配置為 N/2+1, 即多一半的數量,則在滿足此條件下,通過算法保證強一致性。當不滿足配置數時,犧牲可用性即停服。
異常情況下,leader掛掉,此時需要重新從follower選舉leader。可以為f2或者f3。
如果選舉f3為新leader, 則可能會發生消息截斷,因為f3還未同步msg4的數據。Kafka通過unclean.leader.election.enable來控制在這種情況下,是否可以選舉f3為leader。舊版本中默認為true,在某個版本下已默認為false,避免這種情況下消息截斷地出現。
通過ack和min.insync.replicas和unclean.leader.election.enable的配合,保證在Kafka配置為CP系統時,要麼不工作,要麼得到ack後,消息不會丟失且消息狀態一致。
min.insync.replicas 參數默認值為1,即滿足高可用性,只要有1臺能工作即可。但此時可工作的broker狀態不一定正確。
如果想實現Kafka配置為AP(Availability & Partition tolerance)系統:
request.required.acks=1min.insync.replicas = 1unclean.leader.election.enable = false當配置為acks=1 時,即leader接收消息後回ack,這時會出現消息丟失的問題:如果 leader接受到了 第4 條消息,此時還沒有同步到 follower中,leader機器掛了,其中一個follower被選為 leader, 則 第 4 條消息丟失了。
當然這個也需要unclean.leader.election.enable參數配置為false來配合。但是leader回ack的情況下,follower未同步的概率會大大提升。
通過producer策略的配置和Kafka集群通用參數的配置,可以針對自己的業務系統特點來進行合理的參數配置,在通訊性能和消息可靠性下尋得某種平衡。
2. Broker的可靠性保證
消息通過producer發送到broker之後,還會遇到很多問題:
Partition leader 寫入成功,follower什麼時候同步?Leader寫入成功,消費者什麼時候能讀到這條消息?Leader寫入成功後,leader重啟,重啟後消息狀態還正常嗎?Leader重啟,如何選舉新的leader?這些問題集中在:消息落到broker後,集群通過何種機制來保證不同副本間的消息狀態一致性。
3. Kafka消息備份和同步
Kafka通過分區的多副本策略來解決消息的備份問題。通過HW和LEO的標識,來對應ISR和OSR的概念,用於類比共識性算法解決數據同步一致性的問題。
分區多副本即前文提到的Partition 的replica(副本) 分布在跟 partition 不相同的機器上, 通過數據冗餘保證故障自動轉移。而不同副本的狀態形成了ISR和OSR的概念。
ISR : leader 副本保持一定同步的 follower 副本, 包括 leader 副本自己,叫 In Sync Replica;AR: 所有副本 (replicas) 統稱為 assigned replicas, 即 AR;OSR: follower 同 leader 同步數據有一些延遲的節點。ISR是Kafka的同步策略中獨有的概念,區別於raft等共識性算法。Raft要求集群中要求N/2+1太正常,其在這種條件下通過複雜的算法保證選舉出的新leader符合一致性狀態。
而kafka的ISR同步策略,通過ISR列表的可伸縮性和HW&LEO更新,一定程度上解決了消息一致性和吞吐性能之間的平衡。
ISR通過HW和LEO的概念表示消息的同步狀態:
HW : Highwatermark, 俗稱高水位,它表示了一個特定的消息偏移量(offset), 在一個parttion中consumer只能拉取這個 offset 之前的消息(此 offset 跟 consumer offset 不是一個概念) ;LEO: LogEndOffset , 日誌末端偏移量, 用來表示當前日誌文件中下一條寫入消息的offset;leader HW : 該Partititon所有副本的LEO最小值;follower HW : min(follower自身LEO 和 leader HW);Leader HW = 所有副本LEO最小值;Follower HW = min(follower 自身 LEO 和 leader HW);Leader不僅保存了自己的HW & LEO, 還保存了遠端副本的HW & LEO。
簡單來說,每個副本都有HW和LEO的存儲,而leader不但保存自己的HW和LEO, 還保存了每個遠端副本的LEO,用於在自身的HW更新時計算值。
可以看出由於LEO遠端存儲的特性,其實會導致副本真實的LEO和leader存儲的LEO有短暫的數值差異,這會帶來一些問題,下文也會展開講述。
HW和LEO的更新策略如下:
follower自己的LEO
Follower從leader副本拉取消息,寫入磁碟後,更新LEO值
Leader自己的LEO
Leader收到producer消息,寫入磁碟後,更新LEO值
Leader的遠程LEO
Follower fech時帶上自己的LEO, leader使用這個值更新遠程LEO
Follower的自己HW
followerfetch成功更新LEO後,比較leader發來的hw和自己的hw,取較小值
Leader自己的hw
Leader更新LEO之後,更新完遠程LEO之後,取所有副本的最小LEO
一次完整地寫請求的HW / LEO更新流程如下圖所示:
(1)初始狀態
Leader 所有的 HW&LEO都為0, follower 與 leader 建立連接,follower fetch leader, follower 所有 HW & LEO 都為0
(2)Follower 第一次 fetch
Producer 發來一條消息到 leader, 此時 leader 的 LEO=1, follower 帶著自己的 HW&LEO(都為0) 開始 fetch, leader的 HW=min(all follower LEO)=0, leader 記錄follower的LEO=0;follower 拉取到一條消息,帶著消息和leader的 HW(0)&LEO(1)返回自身更新自己的LEO=1, 更新自己的HW=min(follower 自身 LEO(1) 和 leader HW(0))=0
(3)Follower 第二次fetch
Follower帶著自己的 HW(0)&LEO(1) 去請求leader .此時leader 的HW更新為1,leader 保存的follower的 LEO更新為1,帶著leader 的 HW(1)&LEO(1)返回自身,更新自己的 HW&LEO
此時回到剛才提到的問題,這種HW和LEO更新策略有個很明顯的問題,即follower的HW更新需要follower的2輪fetch中的leader返回才能更新,而Leader的HW已更新。
在這之間,如果follower和leader的節點發生故障,則follower的HW和leader的HW會處於不一致狀態,帶來比較多的一致性問題。比如如下場景:
Leader更新完分區HW後,follower HW還未更新,此時follower重啟;Follower重啟後,LEO設置為之前的follower HW值(0), 此時發生消息截斷(臨時狀態);Follower重新同步leader, 此時leader宕機,則不選舉則不可用;Follower被選舉為leader, 則msg 1 永久丟失了。
在Kafka配置為AP系統的情況下,由於min.insync.replicas為1,這種重啟後follower發生截斷髮生的概率會大大提升, 而在多個副本存在的情況下,情況可能還會更加糟糕。
而kafka新版本為了解決這個HW&LEO的同步機制更新缺陷,引入了Epoch的概念。
Leader epoch 分兩部分組成:
Epoch : 版本號。每當副本領導權發生變更時,都會增加該版本號。小版本號的 Leader 被認為是過期 Leader,不能再行使 Leader 權力。起始位移(Start Offset)。Leader 副本在該 Epoch 值上寫入的首條消息的位移。Leader epoch(1, 120) 說明這個leader 的版本號為1,版本的起始位置是 第120條消息開始的
Kafka Broker 會在內存中為每個分區都緩存 Leader Epoch 數據,同時它還會定期地將這些信息持久化到一個 checkpoint 文件中。
當 Leader 副本寫入消息到磁碟時,Broker 會嘗試更新這部分緩存。如果該 Leader 是首次寫入消息,那麼 Broker 會向緩存中增加一個 Leader Epoch 條目,否則就不做更新。
這樣,每次有 Leader 變更時,新的 Leader 副本會查詢這部分緩存,取出對應的 Leader Epoch 的起始位移,以避免數據丟失和不一致的情況。
示意圖如下:
Kafka通過ISR的同步機制及優化策略,用 HW & LEO的方式很好地確保了數據不丟失以及吞吐率。而ISR的管理最終都會反饋到Zookeeper上,其實現和leader的選舉策略不再贅述。
六、Consumer 的可靠性策略
Consumer的可靠性策略集中在consumer的投遞語義上,即:
何時消費,消費到什麼?消費是否會丟?消費是否會重複?這些語義場景,可以通過Kafka消費者的部分參數進行配置,簡單來說有以下3種場景:
1. AutoCommit(at most once, commit後掛,實際會丟)
<span>enable.<span>auto</span>.commit = <span>true</span></span>
<span><span>auto</span>.commit.interval.ms</span>
配置如上的consumer收到消息就返回正確給 brocker, 但是如果業務邏輯沒有走完中斷了,實際上這個消息沒有消費成功。
這種場景適用於可靠性要求不高的業務。其中auto.commit.interval.ms代表了自動提交的間隔。比如設置為1s提交1次,那麼在1s內的故障重啟,會從當前消費offset進行重新消費時,1s內未提交但是已經消費的msg, 會被重新消費到。
2. 手動Commit(at least once, commit前掛,就會重複, 重啟還會丟)
<span>enable.<span>auto</span>.commit = <span>false</span></span>
配置為手動提交的場景下,業務開發者需要在消費消息到消息業務邏輯處理整個流程完成後進行手動提交。
如果在流程未處理結束時發生重啟,則之前消費到未提交的消息會重新消費到,即消息顯然會投遞多次。此處應用與業務邏輯明顯實現了冪等的場景下使用。
特別應關注到在golang中sarama庫的幾個參數的配置:
<span><span>sarama</span><span>.offset</span><span>.initial</span> (<span>oldest</span>, <span>newest</span>)</span>
<span><span>offsets</span><span>.retention</span><span>.minutes</span></span>
intitial = oldest代表消費可以訪問到的topic裡的最早的消息,大於commit的位置,但是小於HW。同時也受到broker上消息保留時間的影響和位移保留時間的影響。不能保證一定能消費到topic起始位置的消息。
如果設置為newest則代表訪問commit位置的下一條消息。如果發生consumer重啟且autocommit沒有設置為false, 則之前的消息會發生丟失,再也消費不到了。在業務環境特別不穩定或非持久化consumer實例的場景下,應特別注意。
一般情況下, offsets.retention.minutes為1440s。
3. Exactly once(很難,需要msg持久化和commit是原子的)
消息投遞且僅投遞一次的語義是很難實現的。 首先要消費消息並且提交保證不會重複投遞,其次提交前要完成整體的業務邏輯關於消息的處理。
在Kafka本身沒有提供此場景語義接口的情況下,這幾乎是不可能有效實現的。一般的解決方案,也是進行原子性的消息存儲,業務邏輯異步慢慢地從存儲中取出消息進行處理。
原文連結:ttps://mp.weixin.qq.com/s?__biz=MzI2NDU4OTExOQ==&mid=2247510663&idx=1&sn=38d0c75b2f3694fb8a5426108e103944
如果覺得本文對你有幫助,可以評論關注支持一下