生產者
Load balancing
生產者直接發送數據到主分區的伺服器上,不需要經過任何中間路由。 為了讓生產者實現這個功能,所有的 kafka 伺服器節點都能響應這樣的元數據請求: 哪些伺服器是活著的,主題的哪些分區是主分區,分配在哪個伺服器上,這樣生產者就能適當地直接發送它的請求到伺服器上。
客戶端控制消息發送數據到哪個分區,這個可以實現隨機的負載均衡方式,或者使用一些特定語義的分區函數。 我們有提供特定分區的接口讓用於根據指定的鍵值進行hash分區(當然也有選項可以重寫分區函數),例如,如果使用用戶ID作為key,則用戶相關的所有數據都會被分發到同一個分區上。 這允許消費者在消費數據時做一些特定的本地化處理。這樣的分區風格經常被設計用於一些本地處理比較敏感的消費者。
Asynchronous send
批處理是提升性能的一個主要驅動,為了允許批量處理,kafka 生產者會嘗試在內存中匯總數據,並用一次請求批次提交信息。 批處理,不僅僅可以配置指定的消息數量,也可以指定等待特定的延遲時間(如64k 或10ms),這允許匯總更多的數據後再發送,在伺服器端也會減少更多的IO操作。 該緩衝是可配置的,並給出了一個機制,通過權衡少量額外的延遲時間獲取更好的吞吐量。
4.5 消費者
Kafka consumer通過向 broker 發出一個「fetch」請求來獲取它想要消費的 partition。consumer 的每個請求都在 log 中指定了對應的 offset,並接收從該位置開始的一大塊數據。因此,consumer 對於該位置的控制就顯得極為重要,並且可以在需要的時候通過回退到該位置再次消費對應的數據。
Push vs. pull
最初我們考慮的問題是:究竟是由 consumer 從 broker 那裡 pull 數據,還是由 broker 將數據 push 到 consumer。Kafka 在這方面採取了一種較為傳統的設計方式,也是大多數的消息系統所共享的方式:即 producer 把數據 push 到 broker,然後 consumer 從 broker 中 pull 數據。
也有一些 logging-centric 的系統,比如 Scribe 和 Apache Flume,沿著一條完全不同的 push-based 的路徑,將數據 push 到下遊節點。這兩種方法都有優缺點。然而,由於 broker 控制著數據傳輸速率, 所以 push-based 系統很難處理不同的 consumer。
讓 broker 控制數據傳輸速率主要是為了讓 consumer 能夠以可能的最大速率消費;不幸的是,這導致著在 push-based 的系統中,當消費速率低於生產速率時,consumer 往往會不堪重負(本質上類似於拒絕服務攻擊)。pull-based 系統有一個很好的特性, 那就是當 consumer 速率落後於 producer 時,可以在適當的時間趕上來。
還可以通過使用某種 backoff 協議來減少這種現象:即 consumer 可以通過 backoff 表示它已經不堪重負了,然而通過獲得負載情況來充分使用 consumer(但永遠不超載)這一方式實現起來比它看起來更棘手。前面以這種方式構建系統的嘗試,引導著 Kafka 走向了更傳統的 pull 模型。
另一個 pull-based 系統的優點在於:它可以大批量生產要發送給 consumer 的數據。而 push-based 系統必須選擇立即發送請求或者積累更多的數據,然後在不知道下遊的 consumer 能否立即處理它的情況下發送這些數據。如果系統調整為低延遲狀態,這就會導致一次只發送一條消息,以至於傳輸的數據不再被緩衝,這種方式是極度浪費的。
而 pull-based 的設計修復了該問題,因為 consumer 總是將所有可用的(或者達到配置的最大長度)消息 pull 到 log 當前位置的後面,從而使得數據能夠得到最佳的處理而不會引入不必要的延遲。
簡單的 pull-based 系統的不足之處在於:如果 broker 中沒有數據,consumer 可能會在一個緊密的循環中結束輪詢,實際上 busy-waiting 直到數據到來。為了避免 busy-waiting,我們在 pull 請求中加入參數,使得 consumer 在一個「long pull」中阻塞等待,直到數據到來(還可以選擇等待給定字節長度的數據來確保傳輸長度)。
你可以想像其它可能的只基於 pull得, end-to-end 的設計。例如producer 直接將數據寫入一個本地的 log,然後 broker 從 producer 那裡 pull 數據,最後 consumer 從 broker 中 pull 數據。通常提到的還有「store-and-forward」式 producer, 這是一種很有趣的設計,但我們覺得它跟我們設定的有數以千計的生產者的應用場景不太相符。
我們在運行大規模持久化數據系統方面的經驗使我們感覺到,橫跨多個應用、涉及數千磁碟的系統事實上並不會讓事情更可靠,反而會成為操作時的噩夢。在實踐中, 我們發現可以通過大規模運行的帶有強大的 SLAs 的 pipeline,而省略 producer 的持久化過程。
消費者的位置
令人驚訝的是,持續追蹤已經被消費的內容是消息系統的關鍵性能點之一。
大多數消息系統都在 broker 上保存被消費消息的元數據。也就是說,當消息被傳遞給 consumer,broker 要麼立即在本地記錄該事件,要麼等待 consumer 的確認後再記錄。這是一種相當直接的選擇,而且事實上對於單機伺服器來說,也沒與其它地方能夠存儲這些狀態信息。
由於大多數消息系統用於存儲的數據結構規模都很小,所以這也是一個很實用的選擇——因為只要 broker 知道哪些消息被消費了,就可以在本地立即進行刪除,一直保持較小的數據量。
也許不太明顯,但要讓 broker 和 consumer 就被消費的數據保持一致性也不是一個小問題。如果 broker 在每條消息被發送到網絡的時候,立即將其標記為 consumed,那麼一旦 consumer 無法處理該消息(可能由 consumer 崩潰或者請求超時或者其他原因導致),該消息就會丟失。 為了解決消息丟失的問題,許多消息系統增加了確認機制:即當消息被發送出去的時候,消息僅被標記為sent而不是 consumed;然後 broker 會等待一個來自 consumer 的特定確認,再將消息標記為consumed。
這個策略修復了消息丟失的問題,但也產生了新問題。 首先,如果 consumer 處理了消息但在發送確認之前出錯了,那麼該消息就會被消費兩次。第二個是關於性能的,現在 broker 必須為每條消息保存多個狀態(首先對其加鎖,確保該消息只被發送一次,然後將其永久的標記為 consumed,以便將其移除)。 還有更棘手的問題要處理,比如如何處理已經發送但一直得不到確認的消息。
Kafka 使用完全不同的方式解決消息丟失問題。Kafka的 topic 被分割成了一組完全有序的 partition,其中每一個 partition 在任意給定的時間內只能被每個訂閱了這個 topic 的 consumer 組中的一個 consumer 消費。
這意味著 partition 中 每一個 consumer 的位置僅僅是一個數字,即下一條要消費的消息的offset。這使得被消費的消息的狀態信息相當少,每個 partition 只需要一個數字。這個狀態信息還可以作為周期性的 checkpoint。這以非常低的代價實現了和消息確認機制等同的效果。
這種方式還有一個附加的好處。consumer 可以回退到之前的 offset 來再次消費之前的數據,這個操作違反了隊列的基本原則,但事實證明對大多數 consumer 來說這是一個必不可少的特性。 例如,如果 consumer 的代碼有 bug,並且在 bug 被發現前已經有一部分數據被消費了, 那麼 consumer 可以在 bug 修復後通過回退到之前的 offset 來再次消費這些數據。
離線數據加載
可伸縮的持久化特性允許 consumer 只進行周期性的消費,例如批量數據加載,周期性將數據加載到諸如 Hadoop 和關係型資料庫之類的離線系統中。
在 Hadoop 的應用場景中,我們通過將數據加載分配到多個獨立的 map 任務來實現並行化,每一個 map 任務負責一個 node/topic/partition,從而達到充分並行化。Hadoop 提供了任務管理機制,失敗的任務可以重新啟動而不會有重複數據的風險,只需要簡單地從原來的位置重啟即可。