原來Kafka源碼也在用二分搜索!

2020-12-08 JavaEdge

設為「星標」,好文章不錯過!

Kafka的索引組件使用二分搜索,而且社區還針對Kafka自身特點對其改良。

1 索引架構

如下幾個類都位於該包下:

AbstractIndex.scala最頂層抽象類:封裝了索引類型的公共操作LazyIndex.scala定義了AbstractIndex上的一個包裝類,實現索引項延遲加載,該類只為提高性能OffsetIndex.scala偏移索引,保存<位移值,文件物理磁碟位置>對。TimeIndex.scala時間戳索引,保存<時間戳,位移值>對。TransactionIndex.scala事務索引,為已中止事務(Aborted Transcation)保存重要元數據。只有啟用Kafka事務特性後,該索引才可能出現

2 AbstractIndex代碼結構

2.1 類定義

2.2 屬性

索引文件(file)每個索引對象在磁碟上都對應一個索引文件。該欄位是var型,說明它可被修改。難道索引對象還能動態更換底層索引文件?是的。1.1.0版本後,Kafka允許遷移底層的日誌路徑,所以,索引文件自然要是可以更換的起始位移值(baseOffset)索引對象對應日誌段對象的起始位移值。查看Kafka日誌路徑,日誌文件和索引文件都是成組出現。比如若日誌文件是00000000000000000123.log,一定還有一組索引文件00000000000000000123.index00000000000000000123.timeindex等。這裡的「123」就是這組文件的起始位移值,即baseOffset索引文件最大字節數(maxIndexSize)控制索引文件的最大長度。Kafka源碼傳入該參數的值是Broker端參數segment.index.bytes值,即10MB。所以默認下所有Kafka索引文件大小都是10MB。索引文件打開方式(writable)「True」:以「讀寫」方式打開,「False」:以「只讀」方式打開。

每個繼承AbstractIndex的子類負責定義具體的索引項結構,基於此架構設計,AbstractIndex定義抽象方法entrySize表示不同索引項的大小

// OffsetIndexoverride def entrySize = 8// TimeIndexoverride def entrySize = 12

為什麼選擇8、12?

在OffsetIndex中,位移值4位元組,物理磁碟位置4位元組,所以共8位元組。但位移值不是長整型嗎,不是應該8位元組?。其實AbstractIndex已保存baseOffset,這裡的位移值,實際上是相對於baseOffset的相對位移值,即

真實位移值 - baseOffset

使用相對位移值能有效節省磁碟空間。而Broker端參數log.segment.bytes是整型,這說明Kafka中每個日誌段文件的大小不會超過2^32,即4GB,這說明同一個日誌段文件上的 位移值 - baseOffset 一定在整數範圍內。因此,源碼只需4位元組保存。

同理,TimeIndex中的時間戳類型是長整型,佔8位元組,位移依然使用相對位移值,佔用4個字節,因此共需12位元組。

3 Kafka的索引底層實現原理

內存映射文件,即Java中的MappedByteBuffer。

內存映射文件的主要優勢在於,它有很高的I/O性能,特別是對於索引這樣的小文件來說,由於文件內存被直接映射到一段虛擬內存上,訪問內存映射文件的速度要快於普通的讀寫文件速度。

在Linux的這段映射的內存區域就是內核的頁緩存(Page Cache)。裡面的數據無需重複拷貝到用戶態空間,避免了大量不必要的時間、空間消耗。

在AbstractIndex中,這個MappedByteBuffer就是名為mmap的變量。接下來,我用注釋的方式,帶你深入了解下這個mmap的主要流程。

這些代碼最主要的作用就是創建mmap對象。要知道,AbstractIndex其他大部分的操作都是和mmap相關。

案例:

計算索引對象中當前有多少個索引項protected var _entries: Int = mmap.position()/ entrySize計算索引文件最多能容納多少個索引項private[this] var _maxEntries: Int = mmap.limit()/ entrySize再進一步,有了這兩個變量,我們就能夠很容易地編寫一個方法,來判斷當前索引文件是否已經寫滿:

def isFull: Boolean = _entries >= _maxEntriesAbstractIndex最重要的就是這個mmap變量。事實上,AbstractIndex繼承類實現添加索引項的主要邏輯,也就是向mmap中添加對應的欄位。

寫入索引項

下面這段代碼是OffsetIndex的append方法,用於向索引文件中寫入新索引項。

append方法的執行流程

查找索引項

索引項的寫入邏輯並不複雜,難點在於如何查找索引項。AbstractIndex定義了抽象方法parseEntry用於查找給定的索引項,如下所示:

protected defparseEntry(buffer: ByteBuffer, n: Int): IndexEntry

「n」表示要查找給定ByteBuffer中保存的第n個索引項(在Kafka中也稱第n個槽)。IndexEntry是源碼定義的一個接口,裡面有兩個方法:indexKey和indexValue,分別返回不同類型索引的<Key,Value>對。

OffsetIndex實現parseEntry的邏輯如下:

overrideprotected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = { OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) }

OffsetPosition是實現IndexEntry的實現類,Key就是之前說的位移值,而Value就是物理磁碟位置值。所以,這裡你能看到代碼調用了relativeOffset(buffer, n) + baseOffset計算出絕對位移值,之後調用physical(buffer, n)計算物理磁碟位置,最後將它們封裝到一起作為一個獨立的索引項返回。

我建議你去看下relativeOffset和physical方法的實現,看看它們是如何計算相對位移值和物理磁碟位置信息的。

有了parseEntry方法,我們就能夠根據給定的n來查找索引項了。但是,這裡還有個問題需要解決,那就是,我們如何確定要找的索引項在第n個槽中呢?其實本質上,這是一個算法問題,也就是如何從一組已排序的數中快速定位符合條件的那個數。

4 二分查找算法

到目前為止,從已排序數組中尋找某個數字最快速的算法就是二分查找了,它能做到O(lgN)的時間複雜度。Kafka的索引組件就應用了二分查找算法。

Kafka索引應用二分查找算法快速定位待查找索引項位置,之後調用parseEntry來讀取索引項。不過,這真的就是無懈可擊的解決方案了嗎?

改進版

顯然不是!我前面說過了,大多數作業系統使用頁緩存來實現內存映射,而目前幾乎所有的作業系統都使用LRU(Least Recently Used)或類似於LRU的機制來管理頁緩存。

Kafka寫入索引文件的方式是在文件末尾追加寫入,而幾乎所有的索引查詢都集中在索引的尾部。這麼來看的話,LRU機制是非常適合Kafka的索引訪問場景的。

但,這裡有個問題是,當Kafka在查詢索引的時候,原版的二分查找算法並沒有考慮到緩存的問題,因此很可能會導致一些不必要的缺頁中斷(Page Fault)。此時,Kafka線程會被阻塞,等待對應的索引項從物理磁碟中讀出並放入到頁緩存中。

下面我舉個例子來說明一下這個情況。假設Kafka的某個索引佔用了作業系統頁緩存13個頁(Page),如果待查找的位移值位於最後一個頁上,也就是Page 12,那麼標準的二分查找算法會依次讀取頁號0、6、9、11和12,具體的推演流程如下所示:

通常來說,一個頁上保存了成百上千的索引項數據。隨著索引文件不斷被寫入,Page #12不斷地被填充新的索引項。如果此時索引查詢方都來自ISR副本或Lag很小的消費者,那麼這些查詢大多集中在對Page #12的查詢,因此,Page #0、6、9、11、12一定經常性地被源碼訪問。也就是說,這些頁一定保存在頁緩存上。後面當新的索引項填滿了Page #12,頁緩存就會申請一個新的Page來保存索引項,即Page #13。

現在,最新索引項保存在Page #13中。如果要查找最新索引項,原版二分查找算法將會依次訪問Page #0、7、10、12和13。此時,問題來了:Page 7和10已經很久沒有被訪問過了,它們大概率不在頁緩存中,因此,一旦索引開始徵用Page #13,就會發生Page Fault,等待那些冷頁數據從磁碟中加載到頁緩存。根據國外用戶的測試,這種加載過程可能長達1秒。

顯然,這是一個普遍的問題,即每當索引文件佔用Page數發生變化時,就會強行變更二分查找的搜索路徑,從而出現不在頁緩存的冷數據必須要加載到頁緩存的情形,而這種加載過程是非常耗時的。

基於這個問題,社區提出了改進版的二分查找策略,也就是緩存友好的搜索算法。總體的思路是,代碼將所有索引項分成兩個部分:熱區(Warm Area)和冷區(Cold Area),然後分別在這兩個區域內執行二分查找算法,如下圖所示:

乍一看,該算法並沒有什麼高大上的改進,僅僅是把搜尋區域分成了冷、熱兩個區域,然後有條件地在不同區域執行普通的二分查找算法罷了。實際上,這個改進版算法提供了一個重要的保證:它能保證那些經常需要被訪問的Page組合是固定的。

想想剛才的例子,同樣是查詢最熱的那部分數據,一旦索引佔用了更多的Page,要遍歷的Page組合就會發生變化。這是導致性能下降的主要原因。

這個改進版算法的最大好處在於,查詢最熱那部分數據所遍歷的Page永遠是固定的,因此大概率在頁緩存中,從而避免無意義的Page Fault。

下面我們來看實際的代碼。我用注釋的方式解釋了改進版算法的實現邏輯。一旦你了解了冷區熱區的分割原理,剩下的就不難了。

5 總結

AbstractIndex是Kafka所有類型索引的抽象父類,裡面的mmap變量是實現索引機制的核心,你一定要掌握它。改進版二分查找算法:社區在標準原版的基礎上,對二分查找算法根據實際訪問場景做了定製化的改進。你需要特別關注改進版在提升緩存性能方面做了哪些努力。改進版能夠有效地提升頁緩存的使用率,從而在整體上降低物理I/O,緩解系統負載瓶頸。你最好能夠從索引這個維度去思考社區在這方面所做的工作。

實際上,無論是AbstractIndex還是它使用的二分查找算法,它們都屬於Kafka索引共性的東西,即所有Kafka索引都具備這些特點或特性。

相關焦點

  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
    目前知識星球內已更新的系列文章:1、Flink 源碼解析 —— 源碼編譯運行2、Flink 源碼解析 —— 項目結構一覽3、Flink 源碼解析—— local 模式啟動流程4、Flink 源碼解析 —— standalonesession 模式啟動流程5、Flink 源碼解析 —— Standalone Session
  • Kafka【入門】就這一篇!
    活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索情況等內容。這種數據通常的處理方式是先把各種活動以日誌的形式寫入某種文件,然後周期性地對這些文件進行統計分析。運營數據指的是伺服器的性能數據(CPU、IO 使用率、請求時間、服務日誌等等數據)。運營數據的統計方法種類繁多。
  • 一文詳解二分搜索樹 ,圖文並茂!
    這也就正好驗證了搜索這個概念。如果我們想要查找的元素大於該節點,我們就去它的右子樹去找,小於就與左子樹就找。  我們知道二分搜索樹的基本規則是一個節點的元素值必須大於它的左子樹所有節點的值小於它的右子樹所有節點的值。我們就是用這條規則進行添加元素。從根節點出發,如果添加的元素大於我們跟節點元素值,我們就去跟節點的右子樹繼續添加元素,直到待插入節點為NULL,我們就把節點插入該位置。
  • kafka使用原理介紹
    1.2 Kafka的使用場景:- 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • kafka極簡教程
    kafka是用於構建實時數據管道和流應用程式。具有橫向擴展,容錯,wicked fast(變態快)等優點,並已在成千上萬家公司運行。
  • 二分查找的妙用:判定子序列
    二分查找本身不難理解,難在巧妙地運用二分查找技巧。對於一個問題,你可能都很難想到它跟二分查找有關,比如前文 最長遞增子序列就藉助一個紙牌遊戲衍生出二分查找解法。今天再講一道巧用二分查找的算法問題:如何判定字符串s是否是字符串t的子序列(可以假定s長度比較小,且t的長度非常大)。
  • 關於Kafka區分請求處理優先級的討論
    通常我們這裡把PRODUCE/FETCH請求稱為數據類請求;把controller發送的那3種請求稱為控制類請求或controller類請求——在源碼中前者被稱為data plane request,後者稱為controller plane request。這種公平處理原則在很多場合下都是不合理的。為什麼?簡單來說控制類請求具有直接令數據類請求失效的能力。
  • kafka入門(原理-搭建-簡單使用)
    前言公司在用kafka接受和發送數據,自己學習過Rabbitmq,不懂kafka挺不爽的,說幹就幹!網上找了許多帖子,學習了很多,小小的demo自己也搭建起來了,美滋滋,下面我認為優秀的網站和自己的步驟展現給大家。
  • 通過問答的方式學習 Go sort 包使用與源碼
    length := rv.Len() quickSort_func(lessSwap{less, swap}, 0, length, maxDepth(length))}可以看到,Slice通過反射獲得 Len()和 Swap()。
  • 大白話+13張圖解 Kafka
    一、Kafka基礎消息系統的作用應該大部份小夥伴都清楚,用機油裝箱舉個例子所以消息系統就是如上圖我們所說的倉庫,能在中間過程作為緩存,並且實現解耦合的作用。可是kafka並不是這樣,比如現在consumerA去消費了一個topicA裡面的數據。
  • Apache Kafka 快速入門指南
    Kafka 對消息保存時根據 Topic 進行歸類,發送消息 者稱為 Producer,消息接受者稱為 Consumer,此外 kafka 集群有多個 kafka 實例組成,每個 實例(server)稱為 broker。
  • Kafka快速入門秘籍:背景介紹,應用場景分析、核心架構分析
    在講實戰前,我們還是有必要講解下理論的,理論為輔,實戰為主,在實戰的基礎上,再深入理解理論,底層原理,底層源碼。下篇文章或者視頻,我們將帶你看官網學習kafka環境搭建、kafka基本用法、kafka的容錯性測試,在掌握知識的同時,還能順便學習下英文。
  • 【Leetcode每日打卡】判斷二分圖
    判斷二分圖」,讓我們一起用「深度優先搜索」、「廣度優先搜索」、「併查集」三種不同的方法搞定它!原題描述力扣 785. 判斷二分圖 難度:Medium給定一個無向圖 graph,當這個圖為二分圖時返回 true。
  • Leetcode刷題-二分查找
    本文對部分涉及二分查找算法的leetcode題目進行了學習與實踐,並給出了個人的二分查找python模板二分查找算法解釋二分查找算法(英語:binary search algorithm),也稱折半搜索算法(英語:half-interval search algorithm
  • CentOS7下簡單搭建zookeeper+kafka集群
    ,配置kakfa1、node1上解壓kafka安裝包cd /opttar -zxf kafka_2.13-2.5.0.tgzmv kafka_2.13-2.5.0 kafkacd kafkacd configcp server.properties server.properties_default
  • Kafka這些名詞都說不出所以然,您竟然敢說自己精通kafka
    kafka基於發布/訂閱模式的消息隊列kafka有那麼多優異的性能,難怪許多大公司都在使用Kafka!kafka,一次消費者去kafka消費消息。producer端的代碼其中指定了一個參數 acks 可以有三個值選擇:0: 實現了at most once,producer完全不管broker的處理結果,回調也就沒有用了
  • Kafka官方文檔翻譯-最新版v2.7(三)
    搜索是以簡單的二進位搜索變化的方式,針對每個文件維護的內存範圍進行的。 日誌提供了獲取最近寫的消息的功能,以便客戶從 "現在 "開始訂閱。在消費者未能在其SLA規定的天數內消費其數據的情況下,這也很有用。在這種情況下,當客戶機試圖消費一個不存在的偏移量時,它將得到一個OutOfRangeException,並且可以根據用例的情況重置自己或失敗。
  • flink-1.12.0 upsert-kafka connector demo
    ,一直沒有試用新的功能,今天剛好試用下 upsert-kafka connector,之前每個版本都自己實現,也是麻煩。The matching candidates:org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryMismatched properties:'connector.type' expects 'kafka', but is 'upsert-kafka'The following properties
  • 二分查找
    二分查找又稱折半查找(Binary Search),是一種效率較高的查找方法。
  • Kafka常見錯誤整理(不斷更新中)
    1、UnknownTopicOrPartitionExceptionorg.apache.kafka.common.errors.UnknownTopicOrPartitionException:Thisserverdoesnothostthistopic-partition報錯內容