Kafka原理與技術

2021-03-06 過往記憶大數據


本文轉載自:http://www.linkedkeeper.com/detail/blog.action?bid=1016,點擊下面 閱讀原文 即可進入


Kafka最初由Linkedin公司開發,是一個分布式、分區、多副本、多訂閱者,基於zookeeper協調的分布式日誌系統(也可以當做MQ系統),常用於web/nginx日誌、訪問日誌,消息服務等等場景。Linkedin於2010年貢獻給了Apache基金會並成為頂級開源項目。

主要應用場景是:日誌收集系統和消息系統。

Kafka主要設計目標如下:

以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能。

高吞吐率,即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。

支持Kafka Server間的消息分區,及分布式消費,同時保證每個partition內的消息順序傳輸。

同時支持離線數據處理和實時數據處理。

Kafka整體系統架構設計如下:


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

一個典型的 Kafka 集群包含若干 Producer,若干 Broker,若干 Consumer,以及一個 Zookeeper 集群。Kafka 通過 Zookeeper 管理集群配置,選舉 Leader,以及在 Consumer Group發生變化時進行 Rebalance。Producer 使用 push
模式將消息發布到 Broker;Consumer 使用 pull 模式從 Broker 訂閱並消費消息。  

Kafka專用術語:

Broker:消息中間件處理結點,一個Kafka節點就是一個Broker,多個Broker可以組成一個Kafka集群。

Topic:一類消息,Kafka集群能夠同時負責多個topic的分發。

Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。

Segment:partition物理上由多個segment組成。

offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序列號叫做offset,用於partition唯一標識一條消息。

Producer:負責發布消息到Kafka broker。

Consumer:消息消費者,向Kafka broker讀取消息的客戶端。

Consumer Group:每個Consumer屬於一個特定的Consumer Group。

at most once:最多一次,這個和JMS中"非持久化"消息類似,發送一次,無論成敗,將不會重發。消費者fetch消息,然後保存offset,然後處理消息;當client保存offset之後,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理。那麼此後"未處理"的消息將不能被fetch到,這就是"at most once"。

at least once:消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功。消費者fetch消息,然後處理消息,然後保存offset。如果消息處理成功之後,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。

exactly once:消息只會發送一次。kafka中並沒有嚴格的去實現(基於2階段提交),我們認為這種策略在kafka中是沒有必要的。

通常情況下 at-least-once 是我們首選。

Topic & Partition

同一個Topic 通常存儲的是一類消息,每個topic內部實現又被分成多個partition,每個partition在存儲層面是append log文件。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

這樣做的好處就是能快速刪除無用文件,有效提高磁碟利用率。

segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,後綴」.index」和「.log」分別表示為segment索引文件、數據文件。

segment文件命名規則:partion全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

segment中index與data file對應關係物理結構如下:


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

上圖中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。

其中以索引文件中元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message),以及該消息的物理偏移地址為497。

了解到segment data file由許多message組成,下面詳細說明message物理結構如下:


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

參數說明:

關鍵字解釋說明8 byte offset在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱為偏移(offset),
它可以唯一確定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message4 byte message sizemessage大小4 byte CRC32用crc32校驗message1 byte 「magic"表示本次發布Kafka服務程序協議版本號1 byte 「attributes"表示為獨立版本、或標識壓縮類型、或編碼類型。4 byte key length表示key的長度,當key為-1時,K byte key欄位不填K byte key可選value bytes payload表示實際消息數據。副本(replication)策略

Kafka的高可靠性的保障來源於其健壯的副本(replication)策略。

1) 數據同步

kafka在0.8版本前沒有提供Partition的Replication機制,一旦Broker宕機,其上的所有Partition就都無法提供服務,而Partition又沒有備份數據,數據的可用性就大大降低了。所以0.8後提供了Replication機制來保證Broker的failover。

引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica作為Follower從Leader中複製數據。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

2) 副本放置策略

為了更好的做負載均衡,Kafka儘量將所有的Partition均勻分配到整個集群上。Kafka分配Replica的算法如下:

將所有存活的N個Brokers和待分配的Partition排序

將第i個Partition分配到第(i mod n)個Broker上,這個Partition的第一個Replica存在於這個分配的Broker上,並且會作為partition的優先副本

將第i個Partition的第j個Replica分配到第((i + j) mod n)個Broker上

假設集群一共有4個brokers,一個topic有4個partition,每個Partition有3個副本。下圖是每個Broker上的副本分配情況。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

3) 同步策略

Producer在發布消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少,Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。

為了提高性能,每個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。因此,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁碟中,也就不能完全保證異常發生後該條消息一定能被Consumer消費。

Consumer讀消息也是從Leader讀取,只有被commit過的消息才會暴露給Consumer。

Kafka Replication的數據流如下圖所示:


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

對於Kafka而言,定義一個Broker是否「活著」包含兩個條件:

Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的「落後太多」指Follower複製的消息落後於Leader後的條數超過預定值或者Follower超過一定時間未向Leader發送fetch請求。

Kafka只解決fail/recover,一條消息只有被ISR裡的所有Follower都從Leader複製過去才會被認為已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而造成數據丟失(Consumer無法消費這些數據)。而對於Producer而言,它可以選擇是否等待消息commit。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。

4) leader選舉

Leader選舉本質上是一個分布式鎖,有兩種方式實現基於ZooKeeper的分布式鎖:

Majority Vote的選舉策略和ZooKeeper中的Zab選舉是類似的,實際上ZooKeeper內部本身就實現了少數服從多數的選舉策略。kafka中對於Partition的leader副本的選舉採用了第一種方法:為Partition分配副本,指定一個ZNode臨時節點,第一個成功創建節點的副本就是Leader節點,其他副本會在這個ZNode節點上註冊Watcher監聽器,一旦Leader宕機,對應的臨時節點就會被自動刪除,這時註冊在該節點上的所有Follower都會收到監聽器事件,它們都會嘗試創建該節點,只有創建成功的那個follower才會成為Leader(ZooKeeper保證對於一個節點只有一個客戶端能創建成功),其他follower繼續重新註冊監聽事件。

Kafka消息分組,消息消費原理

同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

這是Kafka用來實現一個Topic消息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group裡。用Consumer Group還可以將Consumer進行自由的分組而不需要多次發送消息到不同的Topic。

Push vs. Pull

作為一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息並由Consumer從broker pull消息。

push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據Consumer的消費能力以適當的速率消費消息。

對於Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

Kafak順序寫入與數據讀取

生產者(producer)是負責向Kafka提交數據的,Kafka會把收到的消息都寫入到硬碟中,它絕對不會丟失數據。為了優化寫入速度Kafak採用了兩個技術,順序寫入和MMFile。

順序寫入

因為硬碟是機械結構,每次讀寫都會尋址,寫入,其中尋址是一個「機械動作」,它是最耗時的。所以硬碟最「討厭」隨機I/O,最喜歡順序I/O。為了提高讀寫硬碟的速度,Kafka就是使用順序I/O。

每條消息都被append到該Partition中,屬於順序寫磁碟,因此效率非常高。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka是不會刪除數據的,它會把所有的數據都保留下來,每個消費者(Consumer)對每個Topic都有一個offset用來表示讀取到了第幾條數據。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

即便是順序寫入硬碟,硬碟的訪問速度還是不可能追上內存。所以Kafka的數據並不是實時的寫入硬碟,它充分利用了現代作業系統分頁存儲來利用內存提高I/O效率。

在Linux Kernal 2.2之後出現了一種叫做「零拷貝(zero-copy)」系統調用機制,就是跳過「用戶緩衝區」的拷貝,建立一個磁碟空間和內存空間的直接映射,數據不再複製到「用戶態緩衝區」系統上下文切換減少2次,可以提升一倍性能。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

通過mmap,進程像讀寫硬碟一樣讀寫內存(當然是虛擬機內存)。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內核空間複製的開銷(調用文件的read會把數據先放到內核空間的內存中,然後再複製到用戶空間的內存中。)

消費者(讀取數據)

試想一下,一個Web Server傳送一個靜態文件,如何優化?答案是zero copy。傳統模式下我們從硬碟讀取一個文件是這樣的。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

先複製到內核空間(read是系統調用,放到了DMA,所以用內核空間),然後複製到用戶空間(1、2);從用戶空間重新複製到內核空間(你用的socket是系統調用,所以它也有自己的內核空間),最後發送給網卡(3、4)。


如果想及時了解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

Zero Copy中直接從內核空間(DMA的)到內核空間(Socket的),然後發送網卡。這個技術非常普遍,Nginx也是用的這種技術。

實際上,Kafka把所有的消息都存放在一個一個的文件中,當消費者需要數據的時候Kafka直接把「文件」發送給消費者。當不需要把整個文件發出去的時候,Kafka通過調用Zero Copy的sendfile這個函數,這個函數包括:

相關焦點

  • kafka入門(原理-搭建-簡單使用)
    前言公司在用kafka接受和發送數據,自己學習過Rabbitmq,不懂kafka挺不爽的,說幹就幹!網上找了許多帖子,學習了很多,小小的demo自己也搭建起來了,美滋滋,下面我認為優秀的網站和自己的步驟展現給大家。
  • kafka極簡教程
    Apache kafka是消息中間件的一種,我發現很多人不知道消息中間件是什麼,在開始學習之前,我這邊就先簡單的解釋一下什麼是消息中間件,只是粗略的講解,目前kafka已經可以做更多的事情。再比如生產者很強勁(大交易量的情況),生產者1秒鐘生產100個雞蛋,消費者1秒鐘只能吃50個雞蛋,那要不了一會,消費者就吃不消了(消息堵塞,最終導致系統超時),消費者拒絕再吃了,」雞蛋「又丟失了,這個時候我們放個籃子在它們中間,生產出來的雞蛋都放到籃子裡,消費者去籃子裡拿雞蛋,這樣雞蛋就不會丟失了,都在籃子裡,而這個籃子就是」kafka「。
  • kafka使用原理介紹
    1.前言消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集群。2.2 kafka一些原理概念1.持久化kafka使用文件存儲消息(append only log
  • Kafka 基本原理(8000 字小結)
    1)kafka以topic來進行消息管理,每個topic包含多個partition,每個partition對應一個邏輯log,有多個segment組成。2)每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
  • Kafka這些名詞都說不出所以然,您竟然敢說自己精通kafka
    1、頁緩存技術 + 磁碟順序寫,針對於Producer: 消息生產者2、零拷貝技術,針對於Consumer: 消息消費者從Kafka裡經常要消費數據,那麼消費的時候,實際上,就是要從kafka的磁碟文件裡,讀取某條數據,然後發送給下遊的消費者,如下圖所示。
  • Apache Kafka 快速入門指南
    ❝「寫在前面」:我是「雲祁」,一枚熱愛技術、會寫詩的大數據開發猿。暱稱來源於王安石詩中一句 [ 雲之祁祁,或雨於淵 ],甚是喜歡。寫博客一方面是對自己學習的一點點總結及記錄,另一方面則是希望能夠幫助更多對大數據感興趣的朋友。
  • 圖解SparkStreaming與Kafka的整合,細節滿滿
    今天講述的是SparkStreaming與Kafka的整合,這篇文章非常適合剛入門的小夥伴,也歡迎大家前來發表意見,老劉這次會用圖片的形式講述別人技術博客沒有的一些細節,這些細節對剛入門的小夥伴是非常有用的!!!正文為什麼會有SparkStreaming與Kafka的整合?
  • 大白話+13張圖解 Kafka
    Topic是一個邏輯上的概念,並不能直接在圖中把Topic的相關單元畫出需要注意:kafka在0.8版本以前是沒有副本機制的,所以在面對伺服器宕機的突發情況時會丟失數據,所以儘量避免使用這個版本之前的kafkaReplica - 副本kafka中的partition
  • Kafka常見錯誤整理(不斷更新中)
    1、UnknownTopicOrPartitionExceptionorg.apache.kafka.common.errors.UnknownTopicOrPartitionException:Thisserverdoesnothostthistopic-partition報錯內容
  • CentOS7下簡單搭建zookeeper+kafka集群
    ,配置kakfa1、node1上解壓kafka安裝包cd /opttar -zxf kafka_2.13-2.5.0.tgzmv kafka_2.13-2.5.0 kafkacd kafkacd configcp server.properties server.properties_default
  • 從未如此簡單:10分鐘帶你逆襲Kafka!
    ZeroMQ 能夠實現 RabbitMQ 不擅長的高級/複雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這 MQ 能夠應用成功的挑戰。ZeroMQ 具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息伺服器或中間件,因為你的應用程式將扮演這個伺服器角色。
  • Flume+Kafka+Storm+Redis構建大數據實時處理系統
    ,採集到數據之後,輸出到kafka#對於source的配置描述 監聽avroa1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444#對於sink的配置描述 使用kafka做數據的消費a1.sinks.k1.type
  • 你能說出 Kafka 這些原理嗎?
    作者 | cxuan責編 | Elle如果只是為了開發 Kafka 應用程式,或者只是在生產環境使用 Kafka,那麼了解 Kafka 的內部工作原理不是必須的。不過,了解 Kafka 的內部工作原理有助於理解 Kafka 的行為,也利用快速診斷問題。
  • flink-1.12.0 upsert-kafka connector demo
    ,一直沒有試用新的功能,今天剛好試用下 upsert-kafka connector,之前每個版本都自己實現,也是麻煩。使用 sqlSubmit 提交之前的 kafka upsert sqlCREATE TABLE user_log ( user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior VARCHAR ,ts TIMESTAMP(3)) WITH
  • Kafka官方文檔翻譯-最新版v2.7(三)
    一組消息在kafka中的技術術語是記錄批(Record Batch),一個記錄批(Record Batch)包含一條或多條記錄。退一步講,我們可以讓一個記錄批(Record Batch)包含一條記錄。記錄批(Record Batch)和記錄(Record)都有自己的標題。下面將介紹每種格式。
  • Kafka快速入門秘籍:背景介紹,應用場景分析、核心架構分析
    在講實戰前,我們還是有必要講解下理論的,理論為輔,實戰為主,在實戰的基礎上,再深入理解理論,底層原理,底層源碼。下篇文章或者視頻,我們將帶你看官網學習kafka環境搭建、kafka基本用法、kafka的容錯性測試,在掌握知識的同時,還能順便學習下英文。
  • 利用flume+kafka+storm+mysql構建大數據實時系統
    在我們系統中由kafka來接收。下圖為kafka的架構圖:和storm整合1.下載kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus2.該項目下載下來需要調試下,找到依賴jar包。
  • kafka異步雙活方案 mirror maker2深度解析
    mirror maker2背景通常情況下,我們都是使用一套kafka集群處理業務。但有些情況需要使用另一套kafka集群來進行數據同步和備份。在kafka早先版本的時候,kafka針對這種場景就有推出一個叫mirror maker的工具(mirror maker1,以下mm1即代表mirror maker1),用來同步兩個kafka集群的數據。
  • Kafka【入門】就這一篇!
    第一步:下載 Kafka這裡以 Mac OS 為例,在安裝了 Homebrew 的情況下執行下列代碼:brew install kafka由於 Kafka 依賴了 Zookeeper,所以在下載的時候會自動下載。
  • Flink保證端到端exactly-once語義(也適用於kafka)
    當然,TwoPhaseCommitSinkFunction的數據輸出包括apache kafka 0.11以上的版本。flink提供了一個抽象的TwoPhaseCommitSinkFunction類,來讓開發者用更少的代碼來實現端到端的exactly-once語義。