Kafka具有快速,可擴展,耐用和容錯的發布、訂閱消息傳遞系統,被用於JMS(Java消息隊列服務)。
Kafka具有更高的吞吐量,可靠性和複製特性,使其適用於跟蹤服務呼叫或物聯網傳感器數據。
kafka與內存微服務一起使用,提供可靠性,可用於向 CEP(複雜事件流系統)和IoT / IFTTT式自動化系統提供事件。
Kafka可以與Flume ,Spark Streaming,Storm,HBase,Flink和Spark一起工作,以實時接收,分析和處理流數據。
Kafka代理支持在Hadoop或Spark中進行低延遲後續分析的大量消息流,及Kafka流媒體可用於實時分析。
…………
kafka有那麼多優異的性能,難怪許多大公司都在使用Kafka!
01kafka架構
Kafka 運行在一個由一臺或多臺伺服器組成的集群上,並且分區可以跨集群分布。
Kafka 存儲的消息來自任意多被稱為 Producer 生產者的進程。數據從而可以被發布到不同的 Topic 主題下的不同 Partition 分區。在一個分區內,這些消息被索引,並連同時間戳存儲在一起。其它被稱為 Consumer 消費者的進程,可以從分區訂閱消息。
Kafka 一些重要概念,您真的認識熟悉?
Producer: 消息生產者,向 Kafka Broker 發消息的客戶端。
Consumer: 消息消費者,從Kafka Broker 取消息的客戶端。
Consumer Group: 消費者組(CG),消費者組內每個消費者,負責消費不同分區的數據,提高消費能力。一個分區只能由組內一個消費者消費,消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
Broker:一臺 Kafka 機器就是一個 broker。一個集群由多個 broker 組成。一個 broker 可以容納多個 topic。
Topic: 可以理解為一個隊列,topic 將消息分類,生產者和消費者面向的是同一個 topic。
Partition: 為了實現擴展性,提高並發能力,一個非常大的 topic ,可以分布到多個 broker (即伺服器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列。
Replica: 副本,為實現備份的功能,保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失,且 Kafka 仍然能夠繼續工作,Kafka 提供了副本機制,一個 topic 的每個分區,都有若干個副本,一個 leader 和若干個 follower。
Leader: 每個分區多個副本的「主」副本,生產者發送數據的對象,以及消費者消費數據的對象,都是leader。
Follower: 每個分區多個副本的「從」副本,實時從 leader 中同步數據,保持和 leader 數據的同步。leader 發生故障時,某個 follower 還會成為新的 leader。
offset: 消費者消費的位置信息,監控數據消費到什麼位置,當消費者掛掉,再重新恢復的時候,可以從消費位置繼續消費。
Zookeeper: Kafka 集群能夠正常工作,需要依賴於 zookeeper,zookeeper 幫助Kafka 存儲和管理集群信息。
02kafka工作流程
Kafka集群是一個分布式流平臺,將 Record 流存儲在稱為 topic 的類別中,每個記錄由一個鍵、一個值和一個時間戳組成。
發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統:Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,面向的都是同一個 topic。以容錯的持久方式存儲記錄流:topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應於一個 log 文件,該 log 文件中存儲的就是 Producer 生產的數據。處理記錄流:Producer 生產的數據,會不斷追加到該 log 文件末端,且每條數據都有自己的 offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,從上次的位置繼續消費。
03Kafka存儲機制
由於生產者生產的消息,會不斷追加到 log 文件末尾,為防止log 文件過大,導致數據定位效率低下,Kafka 採取了分片和索引機制,將每個 partition分為多個 segment(段),每個 segment 對應兩個文件:「. index」 索引文件和 「. log」 數據文件。
這些文件位於同一文件下,該文件夾的命名規則為:topic 名-分區號。例如,first 這個 topic 有三分分區,則其對應的文件夾為 first-0,first-1,first-2。
index 和 log 文件以當前 segment的第一條消息的 offset 命名。下圖為 index 文件 和 log 文件的結構示意圖。
「.index」 文件存儲大量的索引信息,「.log」 文件存儲大量的數據,索引文件中的元數據指向對應數據文件中 message 的物理偏移量
04Kafka如何保證百萬級寫入速度?
1、頁緩存技術 + 磁碟順序寫,針對於Producer: 消息生產者
首先Kafka是基於作業系統的頁緩存來實現文件寫入的。
作業系統本身有一層緩存,叫做page cache,是在內存裡的緩存,我們也可以稱之為os cache,意思就是作業系統自己管理的緩存。
你在寫入磁碟文件的時候,可以直接寫入這個os cache裡,也就是僅僅寫入內存中,接下來由作業系統自己,決定什麼時候把os cache裡的數據,真的刷入磁碟文件中。
因為其實這裡相當於是在寫內存,不是在寫磁碟,大家看下圖。
接著另外一個就是kafka寫數據的時候,非常關鍵的一點,他是以磁碟順序寫的方式來寫的。也就是說,僅僅將數據追加到文件的末尾,不是在文件的隨機位置,來修改數據。
普通的機械磁碟,如果你要是隨機寫的話,確實性能極差,也就是隨便找到文件的某個位置來寫數據。
但是如果你是追加文件末尾按照順序的方式,來寫數據的話,那麼這種磁碟順序寫的性能,基本上可以跟寫內存的性能本身也是差不多的。
Kafka在寫數據的時候,一方面基於了os層面的page cache來寫數據,本質就是在寫內存罷了;另一方面,採用磁碟順序寫的方式,即使數據刷入磁碟的時候,性能也是極高的,也跟寫內存是差不多的。
所以要保證每秒寫入幾萬,甚至幾十萬條數據的核心點,就是盡最大可能提升每條數據寫入的性能,這樣就可以在單位時間內,寫入更多的數據量,提升吞吐量。
2、零拷貝技術,針對於Consumer: 消息消費者
從Kafka裡經常要消費數據,那麼消費的時候,實際上,就是要從kafka的磁碟文件裡,讀取某條數據,然後發送給下遊的消費者,如下圖所示。
那麼這裡如果頻繁的從磁碟讀數據,然後發給消費者,性能瓶頸在哪裡呢?
假設要是kafka什麼優化都不做,就是很簡單的從磁碟讀數據發送給下遊的消費者,那麼大概過程如下所示:
先看看要讀的數據在不在os cache裡,如果不在的話,就從磁碟文件裡讀取數據後,放入os cache。
接著從作業系統的os cache裡,拷貝數據到應用程式進程的緩存裡,再從應用程式進程的緩存裡,拷貝數據到作業系統層面的Socket緩存裡,最後從Socket緩存裡,提取數據後發送到網卡,最後發送出去給下遊消費。
整個過程,如下圖所示:
看上圖,很明顯可以看到,有兩次沒必要的拷貝吧!
一次是從作業系統的cache裡,拷貝到應用進程的緩存裡,接著又從應用程式緩存裡拷貝回作業系統的Socket緩存裡。而且為了進行這兩次拷貝,中間還發生了好幾次上下文切換,一會兒是應用程式在執行,一會兒上下文切換到作業系統來執行。
所以這種方式來讀取數據是比較消耗性能的。
Kafka為了解決這個問題,在讀數據的時候是引入零拷貝技術。
也就是說,直接讓作業系統的cache中的數據,發送到網卡後,傳輸給下遊的消費者,中間跳過了兩次拷貝數據的步驟,Socket緩存中僅僅會拷貝一個描述符過去,不會拷貝數據到Socket緩存。
大家看下圖,體會一下這個精妙的過程:
通過零拷貝技術,就不需要把os cache裡的數據,拷貝到應用緩存,再從應用緩存拷貝到Socket緩存了,兩次拷貝都省略了,所以叫做零拷貝。
對Socket緩存,僅僅就是拷貝數據的描述符過去,然後數據就直接從os cache中,發送到網卡上去了,這個過程大大的提升了數據消費時,讀取文件數據的性能。
而且大家會注意到,在從磁碟讀數據的時候,會先看看os cache內存中是否有,如果有的話,其實讀數據都是直接讀內存的。
如果kafka集群經過良好的調優,大家會發現大量的數據,都是直接寫入os cache中,然後讀數據的時候,也是從os cache中讀。
相當於是Kafka完全基於內存提供數據的寫和讀了,所以這個整體性能會極其的高。
這個原理與Elasticsearch的架構原理類似,其實ES底層,也是大量基於os cache,實現了海量數據的高性能檢索的。
05Kafka如何做到不丟失不重複消費
kafka其實有兩次消息傳遞,一次生產者發送消息給kafka,一次消費者去kafka消費消息。
兩次傳遞都會影響最終結果,兩次都是精確一次,最終結果才是精確一次。
1、Produce端消息傳遞
其中指定了一個參數 acks 可以有三個值選擇:
0: 實現了at most once,producer完全不管broker的處理結果,回調也就沒有用了,並不能保證消息成功發送,但是這種吞吐量最高。
-1或者all: leader broker會等消息寫入 並且ISR(In-Sync Replicas)都寫入後,才會響應,這種只要ISR有副本存活就肯定不會丟失,但吞吐量最低。
1: 默認值為1,producer級別是at least once。並不能exactly once。 leader broker自己寫入後就響應,不會等待ISR其他的副本寫入,只要leader broker存活就不會丟失,即保證了不丟失,也保證了吞吐量。
若消息成功寫入,而這個時候由於網絡問題,producer沒有收到寫入成功的響應,producer就會開啟重試的操作,直到網絡恢復,消息就發送了多次。這就是at least once了。
2、Consumer端消息傳遞
其中有一個參數是 enable.auto.commit,若設置為true,自動提交的機制,consumer在消費前,提交offset,就實現了at most once;若設置為false,是消費後提交,那麼 auto.commit.interval.ms 也就不被再考慮了,就實現了 at least once 。
06kafka可以在消費端實現"Exactly Once"
通過了解producer端與consumer端的設置,發現kafka在兩端的默認配置,都是at least once,可能重複,通過配置也不能做到exactly once,好像kafka的消息一定會丟失或者重複的,是不是沒有辦法做到exactly once了呢?
確實在kafka 0.11.0.0版本之前producer端確實是不可能的,但是在kafka 0.11.0.0版本之後,kafka正式推出了idempotent(冪等的) producer,及對事務的支持。
冪等的producer
kafka 0.11.0.0版本引入了idempotent producer機制,在這個機制中,同一消息可能被producer發送多次,但是在broker端只會寫入一次,他為每一條消息編號去重,而且對kafka開銷影響不大。
如何設置開啟呢? 需要設置producer端的新參數 enable.idempotent 為true。
而多分區的情況,我們需要保證原子性的寫入多個分區,即寫入到多個分區的消息要麼全部成功,要麼全部回滾。
這時候就需要使用事務,在producer端設置 transcational.id,為一個指定字符串。
這樣冪等producer,只能保證單分區上無重複消息;事務可以保證多分區寫入消息的完整性。
這樣producer端實現了exactly once,那麼consumer端呢?
consumer端,由於可能無法消費事務中所有消息,並且消息可能被刪除,所以事務並不能解決consumer端exactly once的問題,我們可能還是需要自己處理這方面的邏輯。比如自己管理offset的提交,不要自動提交,也是可以實現exactly once的。
還有一個選擇就是使用kafka自己的流處理引擎,也就是Kafka Streams,設置processing.guarantee=exactly_once,就可以輕鬆實現exactly once了。