Kafka官方文檔翻譯-最新版v2.7(三)

2021-02-21 開源輪子
5. 實現
5.1 網絡層

網絡層是一個NIO伺服器,此處不展開詳細介紹了。sendfile的實現是通過給MessageSet接口新增一個writeTo方法來完成的。這樣就可以讓文件支持的消息集使用效率更高的transferTo實現,而不是進程內的緩衝寫。線程模型是一個acceptor線程和N個處理線程,每個處理線程處理固定數量的連接。這種設計已經在其他地方進行了相當徹底的測試,發現它的實現簡單而快速。協議簡單,以便於將來用其他語言實現客戶端。

 

5.2 消息

消息由一個可變長度的頭、一個可變長度的不透明密鑰字節數組和一個可變長度的不透明值字節數組組成。報文頭的格式在下一節中描述。讓鍵和值不透明是一個正確的決定:現在序列化庫取得了很大的進展,任何特定的選擇都不可能適用所有的用途。一個使用Kafka的特定應用程式很可能要求使用特定的序列化類型。RecordBatch接口只是一個消息的迭代器,它使用專門的方法用於批量讀寫NIO通道。

 

5.3 消息格式

消息(也就是記錄)總是分批寫入的。一組消息在kafka中的技術術語是記錄批(Record Batch),一個記錄批(Record Batch)包含一條或多條記錄。退一步講,我們可以讓一個記錄批(Record Batch)包含一條記錄。記錄批(Record Batch)和記錄(Record)都有自己的標題。下面將介紹每種格式。

 

5.3.1 記錄批(Record Batch)

以下是記錄批(RecordBatch)的磁碟格式:

 

baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]

需要注意的是,當啟用壓縮功能時,壓縮後的數據直接緊跟在記錄總數後邊。

CRC涵蓋了從屬性到批次結束的數據(即CRC之後的所有字節)。它位於魔法字節之後,這意味著客戶端必須先解析魔法字節,然後再決定如何解釋批次長度和魔法字節之間的字節。在CRC計算中不包括分區首領時間欄位,以避免當該欄位被分配給經紀商接收的每個批次時,需要重新計算CRC。計算時採用CRC-32C(Castagnoli)多項式。

關於壓實:與舊的消息格式不同,magic v2及以上版本在清理日誌時保留了原始批次的首尾偏移量/序列號。這是為了在重新加載日誌時能夠恢復生產者的狀態而需要的。例如,如果我們沒有保留最後一個序列號,那麼在分區領導失敗後,生產者可能會看到一個OutOfSequence錯誤。為了進行重複檢查,必須保留基本序列號(broker通過驗證傳入批次的第一個和最後一個序列號與該生產者的最後一個序列號相匹配來檢查傳入的Produce請求是否有重複)。因此,當批中的所有記錄都被清理,但為了保留生產者的最後一個序列號,批仍被保留時,日誌中就有可能出現空批。這裡有一個奇怪的地方,就是在壓實過程中不保留firstTimestamp欄位,所以如果批中的第一條記錄被壓實掉,它就會改變。

 

5.3.1.1 控制批次

一個控制批包含一條記錄,稱為控制記錄。控制記錄不應傳遞給應用程式。相反,它們被消費者用來過濾掉中止的事務性消息。

控制記錄的鍵符合以下模式。

version: int16 (current version is 0)
type: int16 (0 indicates an abort marker, 1 indicates a commit)

控制記錄的值的模式取決於類型。該值對客戶來說是不透明的。

 

5.3.2 記錄

Kafka 0.11.0中引入了記錄級頭文件。帶頭記錄的磁碟格式如下。

 

length: varint
attributes: int8
bit 0~7: unused
timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]

 

 

5.3.2.1 記錄頭

headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]

 

 

我們使用與Protobuf相同的varint編碼。關於後者的更多信息可以在這裡找到。記錄中的頭數也是以varint編碼的。

 

5.3.3 舊的信息格式

在Kafka 0.11之前,消息是以消息集的形式傳輸和存儲的。在消息集中,每個消息都有自己的元數據。需要注意的是,雖然消息集以數組的形式表示,但並不像協議中的其他數組元素那樣,前面有一個int32數組大小。

 

消息集合:

MessageSet (Version: 0) => [offset message_size message]
offset => INT64
message_size => INT32
message => crc magic_byte attributes key value
crc => INT32
magic_byte => INT8
attributes => INT8
bit 0~2:
0: no compression
1: gzip
2: snappy
bit 3~7: unused
key => BYTES
value => BYTES


 

MessageSet (Version: 1) => [offset message_size message]
offset => INT64
message_size => INT32
message => crc magic_byte attributes timestamp key value
crc => INT32
magic_byte => INT8
attributes => INT8
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
bit 3: timestampType
0: create time
1: log append time
bit 4~7: unused
timestamp => INT64
key => BYTES
value => BYTES

 

在Kafka 0.10之前的版本中,唯一支持的消息格式版本(用魔力值表示)是0,0.10版本中引入了消息格式版本1,並支持時間戳。

在消息格式版本0和1中,Kafka支持遞歸消息以實現壓縮。在這種情況下,消息的屬性必須被設置為指示壓縮類型之一,並且值域將包含用該類型壓縮的消息集。我們通常將嵌套消息稱為 "內部消息",將封裝消息稱為 "外部消息"。請注意,外層消息的鍵應該是空的,其偏移量將是最後一個內部消息的偏移量。

當接收遞歸的版本0消息時,broker會對它們進行解壓,並且每個內部消息都會單獨分配一個偏移量。在版本1中,為了避免伺服器端重新壓縮,只有包裝消息將被分配一個偏移量。內部消息將有相對的偏移量。絕對偏移量可以使用外層消息的偏移量來計算,它對應於分配給最後一個內部消息的偏移量。

crc欄位包含後續報文字節的CRC32(而不是CRC-32C)(即從魔法字節到值)。

 

5.4 日誌

一個名為 "my_topic "的主題的日誌有兩個分區,由兩個目錄(即my_topic_0和my_topic_1)組成,這兩個目錄中包含了該主題的消息數據文件。日誌文件的格式是一個 "日誌條目 "的序列;每個日誌條目是一個4位元組的整數N,存儲消息長度,後面是N個消息字節。每條消息由一個64位整數偏移量唯一標識,給出該消息在該分區上所有發送到該主題的消息流中開始的字節位置。每條消息的磁碟格式如下。每個日誌文件都以它所包含的第一條消息的偏移量命名。所以創建的第一個文件將是000000000.kafka,每一個額外的文件將有一個整數名,大約是前一個文件的S字節,其中S是配置中給出的最大日誌文件大小。

 

記錄的確切二進位格式是版本化的,並作為標準接口進行維護,因此在理想的情況下,記錄批次可以在生產者、broker和客戶端之間傳輸,而無需重新複製或轉換。上一節包括了關於記錄的磁碟格式的細節。

 

使用消息偏移量作為消息id是不尋常的。我們最初的想法是使用生產者生成的 GUID,並在每個broker上維護一個從 GUID 到偏移量的映射。但由於消費者必須為每個伺服器維護一個ID,所以GUID的全局唯一性沒有提供任何價值。此外,維護從隨機id到偏移量的映射的複雜性,需要一個重磅的索引結構,而這個結構必須與磁碟同步,本質上需要一個完整的持久化隨機訪問數據結構。因此為了簡化查找結構,我們決定使用一個簡單的每個分區原子計數器,它可以與分區id和節點id耦合,以唯一標識一個消息;這使得查找結構更簡單,儘管每個消費者請求仍有可能進行多次查找。然而,一旦我們確定了計數器,直接使用偏移量似乎是很自然的--畢竟兩者都是分區唯一的單調遞增整數。由於偏移量被隱藏在消費者API中,這個決定最終是一個實現細節,我們選擇了更有效的方法。

 

 

 

 寫

日誌允許串行追加,它總是轉到最後一個文件。當這個文件達到一個可配置的大小(比如1GB)時,就會轉到一個新的文件。日誌需要兩個配置參數。M,給出了在強制作業系統將文件刷新到磁碟之前要寫的信息數量,S,給出了強制刷新的秒數。這樣可以保證在系統崩潰時最多丟失M條消息或S秒數據的耐久性。

 

讀取是通過給定消息的64位邏輯偏移量和S字節的最大分塊大小來完成的,這將返回一個S字節緩衝區中的消息迭代器。這將返回一個迭代器,遍歷S字節緩衝區中包含的消息。S 字節的大小要大於任何一條消息,但如果消息異常大,可以多次重試讀取,每次都將緩衝區大小加倍,直到消息被成功讀取。可以指定一個最大的消息和緩衝區大小,以使伺服器拒絕大於某個大小的消息,並給客戶端一個最大的約束,它需要讀取一個完整的消息。讀取的緩衝區很可能以部分消息結束,這很容易通過大小分隔來檢測。

 

從偏移量讀取的實際過程,需要先定位數據存放的日誌段文件,從全局偏移量值中計算出特定文件的偏移量,然後從該文件偏移量中讀取。搜索是以簡單的二進位搜索變化的方式,針對每個文件維護的內存範圍進行的。

 

日誌提供了獲取最近寫的消息的功能,以便客戶從 "現在 "開始訂閱。在消費者未能在其SLA規定的天數內消費其數據的情況下,這也很有用。在這種情況下,當客戶機試圖消費一個不存在的偏移量時,它將得到一個OutOfRangeException,並且可以根據用例的情況重置自己或失敗。

 

以下是發送給消費者的結果的格式。

 

MessageSetSend (fetch result)

total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes



MultiMessageSetSend (multiFetch result)

total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n

 

 

刪除

每次刪除一個日誌段的數據。日誌管理器應用兩個指標來確定符合刪除條件的片段:時間和大小。對於基於時間的策略,會考慮記錄的時間戳,由分段文件中最大的時間戳(記錄順序無關)定義整個分段的保留時間。基於大小的保留默認為禁用。啟用時,日誌管理器會不斷刪除最舊的段文件,直到分區的整體大小再次在配置的限制範圍內。如果同時啟用了這兩個策略,則會刪除因任一策略而符合刪除條件的段。為了避免鎖定讀取,同時仍然允許修改段列表的刪除,我們使用了一種複製-寫式的段列表實現,它提供了一致的視圖,允許在刪除進行時,在日誌段的不可改變的靜態快照視圖上進行二進位搜索。

 

保障

日誌提供了一個配置參數M,它控制了在強制刷新到磁碟之前寫入的最大消息數量。在啟動時,會運行一個日誌恢復進程,該進程會遍歷最新日誌段中的所有消息,並驗證每個消息條目是否有效。如果消息的大小和偏移量之和小於文件的長度,並且消息有效載荷的CRC32與消息存儲的CRC相匹配,則消息條目有效。如果檢測到損壞,日誌會被截斷到最後一個有效的偏移量。

 

請注意,必須處理兩種腐敗:截斷,其中一個未寫的塊由於崩潰而丟失,以及一個無意義的塊被添加到文件中的腐敗。原因是一般情況下,作業系統不保證文件inode和實際塊數據之間的寫入順序,所以如果inode更新了新的大小,但在寫入包含該數據的塊之前發生了崩潰,那麼除了丟失寫入數據外,文件還可能獲得無意義數據。CRC可以檢測到這種角落的情況,並防止它破壞日誌(當然,未寫入的信息會丟失)。

 

5.5 分配消費者偏移量追蹤

Kafka消費者跟蹤它在每個分區中消耗的最大偏移量,並具有提交偏移量的能力,以便在重新啟動時可以從這些偏移量恢復。Kafka提供了一個選項,可以將某個消費者組的所有偏移量存儲在一個指定的broker(針對該組)中,稱為組協調器.也就是說,該消費者組中的任何消費者實例都應該將其偏移量提交和取回發送到該組協調器(broker)。消費者組根據其組名被分配給協調器。消費者可以通過向任何Kafka broker發出FindCoordinatorRequest並讀取FindCoordinatorResponse來查找它的協調人,FindCoordinatorResponse將包含協調人的詳細信息。然後,消費者可以繼續從協調人經紀商那裡提交或獲取偏移量。如果協調器移動了,消費者需要重新發現協調器。偏移提交可以由消費者實例自動或手動完成。

 

當組協調器收到OffsetCommitRequest時,它會將請求附加到一個名為__consumer_offsets的特殊壓縮Kafka主題中。只有在offsets主題的所有副本都收到offsets之後,代理才會向消費者發送一個成功的偏移提交響應。如果偏移量未能在可配置的超時內複製,偏移量提交將失敗,消費者可以在後退後重新嘗試提交。broker會定期壓縮偏移主題,因為它只需要維護每個分區的最新偏移提交。協調器還將偏移量緩存在內存表中,以便快速服務於偏移量的獲取。

 

當協調器收到一個偏移獲取請求時,它只需從偏移緩存中返回最後提交的偏移向量。在協調器剛剛啟動的情況下,或者如果它剛剛成為一組新的消費者組的協調器(通過成為偏移量主題分區的領導者),它可能需要將偏移量主題分區加載到緩存中。在這種情況下,偏移量的獲取將以CoordinatorLoadInProgressException失敗,消費者可能會在後退後重新嘗試OffsetFetchRequest。

 

Zookeeper目錄

下面給出了ZooKeeper結構和用於協調消費者和broker的算法。

 

符號

當一個路徑中的元素用[xyz]表示時,意味著xyz的值不是固定的,實際上xyz的每一個可能的值都有一個ZooKeeper znode。例如/topics/[topic]將是一個名為/topics的目錄,其中包含了每個主題名的子目錄。還給出了數字範圍,如[0...5]來表示子目錄0,1,2,3,4。箭頭->用來表示一個znode的內容。例如,/hello -> world表示一個包含 "world "的znode /hello。

 

broker節點註冊

/brokers/ids/[0...N] --> {"jmx_port":...,"timestamp":...,"endpoints":[...],"host":...,"version":...,"port":...} (ephemeral node)

 

這是所有存在的 broker 節點的列表,每個節點都提供了一個唯一的邏輯 broker id,它可以向消費者標識它(必須作為其配置的一部分給出)。在啟動時,broker 節點通過在 /brokers/ids 下創建一個帶有邏輯 broker id 的 znode 來註冊自己。邏輯 broker id 的目的是允許 broker 移動到不同的物理機器上,而不影響消費者。試圖註冊一個已經在使用的broker id(比如說因為兩臺伺服器配置了相同的broker id)會導致錯誤。

由於broker在ZooKeeper中使用短暫的znodes註冊自己,這個註冊是動態的,如果broker被關閉或死亡,這個註冊就會消失(從而通知消費者它不再可用)。

 

broker主題註冊

/brokers/topics/[topic]/partitions/[0...N]/state --> {"controller_epoch":...,"leader":...,"version":...,"leader_epoch":...,"isr":[...]} (ephemeral node)

 

每個broker在其維護的主題下註冊自己,並存儲該主題的分區數量。

 集群ID

群集id是分配給Kafka群集的唯一且不可更改的標識符。群集id最多可以有22個字符,允許的字符由正則表達式[a-zA-Z0-9_/9/\-]+定義,它對應於沒有填充的URL安全Base64變體所使用的字符。從概念上來說,它是在集群第一次啟動時自動生成的。

 

在實現上,它是在第一次成功啟動0.10.1或更高版本的broker時生成的。broker在啟動過程中會嘗試從/cluster/id znode中獲取集群id。如果znode不存在,broker會生成一個新的集群id,並使用這個集群id創建znode。

broker節點註冊

broker節點基本上是獨立的,所以它們只發布自己所擁有的信息。當一個broker加入時,它在broker節點註冊表目錄下註冊自己,並寫入自己的主機名和埠信息。broker也會在broker主題註冊表中註冊現有主題的列表及其邏輯分區。新的主題在broker上創建時是動態註冊的。

 

6.操作

以下是基於LinkedIn的使用情況和經驗,關於將Kafka作為生產系統實際運行的一些信息。請將您知道的任何其他技巧發送給我們。

6.1 Kafka基本操作

本節將回顧您將在Kafka集群上執行的最常見的操作。本節回顧的所有工具都可以在Kafka發行版的bin/目錄下使用,如果運行時沒有參數,每個工具都會列印所有可能的命令行選項的詳細信息。

添加和刪除主題

您可以選擇手動添加主題,或在數據首次發布到不存在的主題時讓它們自動創建。如果主題是自動創建的,那麼你可能要調整用於自動創建主題的默認主題配置。
使用主題工具添加和修改主題。

 

> bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y

複製因子控制有多少臺伺服器會複製每條被寫入的消息。如果您的複製因子為3,那麼在您失去對數據的訪問之前,最多可以有2臺伺服器發生故障。我們建議您使用2或3的複製因子,這樣您就可以在不中斷數據消耗的情況下透明地跳轉機器。
分區數控制主題將被分片成多少個日誌。分區數有幾個影響。首先每個分區必須完全適合在一臺伺服器上。因此,如果你有20個分區,那麼全部數據集(以及讀寫負載)將由不超過20臺伺服器處理(不計算副本)。最後,分區數會影響你的消費者的最大並行性。這在概念部分有更詳細的討論。

每個分片分區的日誌都會被放置到Kafka日誌目錄下自己的文件夾中。這種文件夾的名稱由主題名稱、破折號(-)和分區id組成。由於一個典型的文件夾名不能超過255個字符,所以主題名的長度會有限制。我們假設分區的數量永遠不會超過100,000。因此,主題名不能超過249個字符。這就為文件夾名中的破折號和可能長達 5 位的分區 ID 留出了足夠的空間。

在命令行上添加的配置覆蓋了伺服器的默認設置,比如數據應該保留的時間長度。完整的按主題配置集在這裡有記載。

 

修改主題

您可以使用同一主題工具更改主題的配置或分區。
要添加分區,您可以執行以下操作

 

> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
--partitions 40


要知道,分區的一個用例是對數據進行語義上的分區,而添加分區並不會改變現有數據的分區,所以如果消費者依賴該分區,這可能會打擾到他們。也就是說,如果數據是按照hash(key) % number_of_partitions進行分區的,那麼這個分區將有可能通過添加分區來洗牌,但Kafka不會嘗試以任何方式自動重新分配數據。
要添加配置:

> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y

 

要刪除一個配置:

> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x

 

最終刪除一個主題:

> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name

 

Kafka目前不支持減少一個主題的分區數量。

更改主題的複製因子的說明可以在這裡找到。

 

優雅地關機

Kafka集群將自動檢測任何broker關閉或故障,並為該機器上的分區選舉新的領導者。無論伺服器發生故障,還是因維護或配置更改而故意宕機,都會發生這種情況。對於後一種情況,Kafka支持一種更優雅的機制來停止伺服器,而不是直接殺死它。當一個伺服器被優雅地停止時,它有兩個優化,它會利用這兩個優化。

它會將所有的日誌同步到磁碟上,以避免在重新啟動時需要進行任何日誌恢復(即驗證日誌尾部所有消息的校驗和)。日誌恢復需要時間,所以這可以加快有意重啟的速度。

它將在關閉之前把伺服器是領導的任何分區遷移到其他副本。這將使領導權轉移更快,並將每個分區不可用的時間減少到幾毫秒。

除硬殺外,每當伺服器停止時,同步日誌將自動發生,但受控領導力遷移需要使用特殊設置:

controlled.shutdown.enable=true

請注意,只有當所有託管在broker上的分區都有副本時,受控關閉才會成功(即複製因子大於1,並且這些副本中至少有一個是活的)。這通常是你想要的,因為關閉最後一個副本會使該主題分區不可用。

 

平衡領導權

每當一個broker停止或崩潰時,該broker的分區的領導權就會轉移到其他副本上。當broker重新啟動時,它將只成為其所有分區的跟隨者,這意味著它將不會被用於客戶端讀寫。
為了避免這種不平衡,Kafka有一個首選副本的概念。如果一個分區的副本列表是1,5,9,那麼節點1作為領導者比節點5或9都要優先,因為它在副本列表中更早。默認情況下,Kafka集群會嘗試將領導權恢復給被恢復的副本。這種行為是通過以下方式配置的。

 

auto.leader.rebalance.enable=true

 

您也可以將此設置為false,但您需要通過運行命令手動將領導力恢復到恢復的副本。

> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

 

平衡各機架上的副本

機架感知功能將同一分區的副本分布在不同機架上。這將Kafka為broker故障提供的保證擴展到覆蓋機架故障,限制了一個機架上所有broker同時故障時的數據丟失風險。該功能還可以應用於其他經紀商分組,如EC2中的可用性區。


您可以通過在broker config中添加一個屬性來指定某個broker屬於某個機架:

broker.rack=my-rack-id

 

當一個主題被創建、修改或副本被重新分配時,機架約束將被尊重,確保副本跨越儘可能多的機架(一個分區將跨越min(#racks, replication-factor)不同的機架)。
用於將副本分配給broker的算法確保每個broker的leader數量不變,無論broker如何分布在機架上。這確保了均衡的吞吐量。
然而,如果機架上分配了不同數量的broker,那麼複製件的分配將不會是均勻的。擁有較少broker的機架將獲得更多的複製,這意味著它們將使用更多的存儲並將更多的資源投入到複製中。因此,每個機架配置相同數量的broker是明智的。

 

集群之間的數據鏡像和地理複製

Kafka管理員可以定義跨越單個Kafka集群、數據中心或地理區域邊界的數據流。更多信息請參考 "地理複製 "一節。

 

檢查消費者位置

有時候,看看你的消費者的位置是很有用的。我們有一個工具,可以顯示一個消費群中所有消費者的位置,以及他們在日誌末端的落後程度。如果要在一個名為my-group的消費群組上運行這個工具,消費一個名為my-topic的主題,會是這樣的。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2

 

管理消費者組

通過ConsumerGroupCommand工具,我們可以列出、描述或刪除消費者組。消費者組可以手動刪除,也可以在該組最後一次提交的偏移量到期時自動刪除。只有當該組沒有任何活動成員時,手動刪除才有效。例如,要列出所有主題的所有消費者組。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

test-consumer-group

 

如前所述,為了查看偏移量,我們是這樣 "描述 "消費群體的。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4

 

有一些額外的 "描述 "選項可以用來提供有關消費者群體的更多詳細信息。
--members  該選項提供消費群中所有活躍成員的清單:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members

CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0

--members--verbose   除了上述"--成員 "選項報告的信息外,該選項還提供分配給每個成員的分區。 

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose

CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -

--offsets。這是默認的describe選項,提供與"--describe "選項相同的輸出。
--state:這個選項提供了有用的組級信息。這個選項提供了有用的組級信息。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state

COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
localhost:9092 (0) range Stable 4

要手動刪除一個或多個消費群,可使用"--刪除 "選項。

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

 

要重置一個消費群的偏移量,可以使用"--reset-offsets "選項。這個選項一次只支持一個消費群。它需要定義以下範圍:---all-topics 或 ---topic。必須選擇一個範圍,除非你使用"--from-file "方案。另外,首先確保消費者實例是非活動的。更多細節請參見KIP-122。

它有3個執行選項。

(default)顯示要重置的偏移量。

--execute : 執行 --reset-offsets 進程。

--export : 將結果導出為CSV格式。

--reset-offsets也有以下方案可供選擇(必須至少選擇一個方案)。

--to-dateatetime <String: datetime> : 將偏移量重置為日期時間的偏移量。格式:'YYYY-MM-DD'。'YYYY-MM-DDTHH:MM:SS.sss' 。

--toearliest :將偏移量重置為最早的偏移量。

--to-latest : 將偏移量重置為最早的偏移量。將偏移量重置為最新的偏移量。

--shift-by <Long: number-of-offsets> : 重置偏移量,將當前偏移量移動'n',其中'n'可以是正或負。

--from-file : 將偏移量重置為CSV文件中定義的值。

--to-current : 將偏移量重置為當前偏移量。將偏移量重置為當前的偏移量。

--by-duration <String: duration> : 將偏移量重置為當前時間戳的持續時間的偏移量。格式:'PnDTnHnM'。'PnDTnHnMnS'。

--to-offset : 將偏移量重置為一個特定的偏移量。

請注意,超出範圍的偏移將被調整到可用的偏移端。例如,如果偏移量在10,而偏移量的請求是15,那麼,實際上將選擇10的偏移量。
例如,將消費者組的偏移量重置為最新的偏移量:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

TOPIC PARTITION NEW-OFFSET
topic1 0 0

如果你使用舊的高級消費者,並將組元數據存儲在ZooKeeper中(即offsets.storage=zookeeper),傳遞--zookeeper而不是--bootstrap-server。

> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

 

集群擴展

將伺服器添加到Kafka集群很簡單,只需給它們分配一個唯一的broker id,然後在新伺服器上啟動Kafka。然而這些新伺服器不會自動分配任何數據分區,所以除非分區被移動到它們身上,否則它們不會做任何工作,直到新的主題被創建。因此,通常當你將機器添加到你的集群時,你會希望將一些現有的數據遷移到這些機器上。
遷移數據的過程是手動發起的,但完全自動化。在掩護下發生的事情是,Kafka會將新伺服器添加為它要遷移的分區的跟隨者,並允許它完全複製該分區中的現有數據。當新伺服器完全複製了這個分區的內容,並加入了同步副本後,其中一個現有的副本將刪除其分區的數據。

分區重新分配工具可以用來在不同的經紀商之間移動分區。理想的分區分布將確保所有經紀商的數據負載和分區大小均勻。分區重新分配工具沒有能力自動研究Kafka集群中的數據分布,並移動分區以達到均勻的負載分布。因此,管理員必須弄清楚哪些主題或分區應該被移動。

分區重新分配工具可以在3種相互排斥的模式下運行。

--生成模式 在這種模式下,給定一個主題列表和一個broker列表,該工具會生成一個候選的重新分配,將指定主題的所有分區移動到新的broker上。這個選項只是提供了一種方便的方式來生成給定主題和目標broker列表的分區重新分配計劃。

--執行。在這種模式下,工具會根據用戶提供的重新分配計劃啟動分區的重新分配。(使用--reassignment-js-on-file選項)。這可以是管理員手工製作的自定義重新分配計劃,也可以是使用--generate選項提供的。

--驗證:在該模式下,該工具將驗證上次 --執行期間列出的所有分區的重新分配狀態。狀態可以是成功完成、失敗或正在進行。

 

自動將數據遷移到新機器上

分區重新分配工具可用於將一些主題從當前的經紀商集合中移到新添加的經紀商中。這在擴展現有群集時通常很有用,因為將整個主題移動到新的經紀商集合中比一次移動一個分區更容易。當用來做這件事時,用戶應該提供一個應該移動到新的經紀商集的主題列表和一個新經紀商的目標列表。然後,該工具會將給定主題列表的所有分區均勻地分配到新的經紀商集上。在這個移動過程中,主題的複製因子保持不變。實際上,輸入的主題列表的所有分區的複製都從舊的經紀商集移動到新增加的經紀商。
例如,下面的例子將把主題foo1,foo2的所有分區移動到新的經紀商5,6集。在移動結束後,主題foo1和foo2的所有分區將只存在於broker5,6上。

由於該工具接受輸入的主題列表為 json 文件,因此首先需要確定要移動的主題,並創建 json 文件,如下所示:

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}

 

 

一旦json文件準備好,使用分區重分配工具生成候選分配。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

 

該工具會生成一個候選任務,將所有分區從主題foo1,foo2移動到broker5,6。但請注意,此時分區移動還沒有開始,它只是告訴你當前的分配和建議的新分配。當前的分配應該被保存,以防你想回滾到它。新的分配應該保存在一個json文件中(例如expand-cluster-reassignment.json),以便用--execute選項輸入到工具中,如下所示。

 

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

 

最後,--verify選項可以與該工具一起使用,以檢查分區重新分配的狀態。請注意,在使用--verify選項時,應該使用相同的expand-cluster-reassignment.json(與--execute選項一起使用)。

 

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

 

自定義分區分配和遷移

分區重新分配工具還可用於有選擇地將一個分區的副本移至一組特定的broker。當以這種方式使用時,假定用戶知道重新分配計劃,不需要該工具生成候選的重新分配,有效地跳過--生成步驟,直接進入--執行步驟。
例如,下面的例子將topic foo1的分區0移動到broker 5,6,將topic foo2的分區1移動到broker 2,3。

第一步是在json文件中手工製作自定義的重新分配計劃:

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然後,使用帶有--execute選項的json文件開始重新分配過程。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

 

--verify選項可用於工具檢查分區重新分配的狀態。請注意,在使用--驗證選項時,應該使用相同的自定義重新分配.json(與--執行選項一起使用)。

 

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully

 

退役的broker

分區重新分配工具尚不具備為停用的代理自動生成重新分配計劃的能力。因此,管理員必須提出一個重新分配計劃,將託管在要停用的broker上的所有分區的副本轉移到其他broker上。這可能是比較繁瑣的,因為重新分配需要確保所有的副本不會從退役的broker移動到只有一個其他broker。為了使這一過程不費吹灰之力,我們計劃在未來為退役broker增加工具支持。

增加複製因子

增加現有分區的複製因子很容易。只需在自定義的重新分配json文件中指定額外的複製因子,並與--execute選項一起使用,就可以增加指定分區的複製因子。


例如,下面的例子將topic foo的分區0的複製因子從1增加到3,在增加複製因子之前,該分區的唯一副本存在於broker 5上。作為增加複製因子的一部分,我們將在broker6和7上增加更多的副本。

 

第一步是在json文件中手工製作自定義的重新分配計劃。

 

> cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

然後,使用帶有--execute選項的json文件開始重新分配過程。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

 

--verify選項可以與該工具一起使用,以檢查分區重新分配的狀態。請注意,在使用--驗證選項時,應該使用相同的遞增複製因子.json(與--execute選項一起使用)。

 

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully

 

你也可以用kafka-topics工具驗證複製因子的增加。

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7

限制數據遷移期間的帶寬使用

Kafka讓你可以對複製流量進行節流,對用於將複製從機器移動到機器的帶寬設置一個上限。這在重新平衡集群、引導新的代理或添加或刪除代理時非常有用,因為它限制了這些數據密集型操作對用戶的影響。
有兩個接口可以用來參與節流。最簡單,也是最安全的是在調用kafka-reassign-partitions.sh時應用節流,但kafka-configs.sh也可以直接用來查看和改變節流值。
所以舉例來說,如果你要執行重新分配,用下面的命令,它將以不超過50MB/s的速度移動分區。

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000

 

當你執行這個腳本時,你會看到節氣門嚙合。

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果你想在重新平衡過程中改變節流閥,比如說增加吞吐量以使其完成得更快,你可以通過重新運行傳遞相同的重新分配-js-文件的執行命令來實現。

$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s

 

一旦再平衡完成,管理員可以使用--驗證選項檢查再平衡的狀態。如果再平衡已經完成,將通過--驗證命令移除節流閥。重要的是,管理員在再平衡完成後,要通過運行帶有--verify選項的命令及時移除節流閥。如果不這樣做,可能會導致常規複製流量被節流。

當執行--verify選項,並且重新分配完成後,腳本將確認節流被移除。

> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.

管理員也可以使用kafka-configs.sh來驗證分配的配置。有兩對節流配置用於管理節流過程。第一對是指節流值本身。這是在broker層面,使用動態屬性進行配置的。

 

leader.replication.throttled.rate
follower.replication.throttled.rate

 

然後是配置對列舉的節制複製集。

 

leader.replication.throttled.replicas
follower.replication.throttled.replicas

 

其中每個主題的配置值。

所有四個配置值都是由kafka-reassign-partitions.sh自動分配的(下面討論)。

要查看節流限制配置。

 

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

 

這顯示了應用於複製協議的領導者和跟隨者端的節流。默認情況下,兩邊都被分配了相同的節流吞吐量值。

要查看節流複製的列表。

> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102

 

在這裡,我們看到領導節流被應用於經紀商102上的分區1和經紀商101上的分區0。同樣的,跟隨者節制也被應用於broker 101上的分區1和broker 102上的分區0。

默認情況下,kafka-reassign-partitions.sh會將leader節流應用到所有在重新平衡之前存在的複製,其中任何一個複製都可能是leader。它將對所有移動目的地應用follower節制。因此,如果在經紀商 101,102 上有一個複製的分區,被重新分配到 102,103,那麼該分區的領導者節流將被應用到 101,102,而跟隨者節流將只應用到 103。

如果需要,你也可以使用kafka-configs.sh上的--alter開關來手動改變節流配置。

安全使用節流複製

在使用節流複製時,應注意一些問題。尤其是

(1)節流器的移除。

重新分配完成後,應及時移除節流器(通過運行kafka-reassign-partitions.sh--verify)。
(2)保證進度。

如果節流閥設置得太低,與傳入的寫入速率相比,有可能導致複製沒有進展。這種情況發生在以下情況。

 

max(BytesInPerSec) > throttle

 

其中BytesInPerSec是監控生產者向每個broker寫入吞吐量的度量。

管理員可以在重新平衡期間,使用該指標監控複製是否有進展。

 

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

 

在複製過程中,滯後應該不斷減少。如果該指標沒有減少,管理員應如上所述增加節流吞吐量。

 

設置配額

配額覆蓋和默認值可以在(用戶、客戶端-id)、用戶或客戶端-id級別進行配置,如這裡所述。默認情況下,客戶端獲得的配額是無限的。可以為每個(用戶、客戶機id)、用戶或客戶機id組設置自定義配額。
為(user=user1,client-id=clientA)配置自定義配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

 

為user=user1配置自定義配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.

 

為client-id=clientA配置自定義配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.

通過指定--entity-default選項而不是--entity-name,可以為每個(用戶、client-id)、用戶或client-id組設置默認配額。
為user=userA配置默認的client-id配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.


配置用戶的默認配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.


配置client-id的默認配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.


下面是如何描述一個給定的(用戶,client-id)的配額。

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

描述給定用戶的配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

 

描述給定客戶ID的配額:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

 

如果沒有指定實體名稱,則描述指定類型的所有實體。例如,描述所有用戶:

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

 

同理,對於(用戶,客戶端):

> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

 

通過在broker上設置這些配置,可以設置適用於所有客戶端ID的默認配額。只有在Zookeeper中沒有配置配額覆蓋或默認值時,才會應用這些屬性。默認情況下,每個客戶機id都會收到一個無限的配額。下面將每個生產者和消費者客戶端-id的默認配額設置為10MB/秒。

quota.producer.default=10485760
quota.consumer.default=10485760

 

請注意,這些屬性已經被廢棄,可能會在未來的版本中被刪除。使用kafka-configs.sh配置的默認值優先於這些屬性。

 

6.2 數據中心

一些部署將需要管理一個跨越多個數據中心的數據管道。我們推薦的方法是在每個數據中心部署一個本地的Kafka集群,每個數據中心的應用實例僅與其本地集群進行交互,並在集群之間進行數據鏡像(如何做到這一點,請參見關於地理複製的文檔)。
這種部署模式允許數據中心作為獨立的實體行事,並允許我們集中管理和調整數據中心間的複製。這允許每個設施獨立運行,即使數據中心間鏈路不可用:當發生這種情況時,鏡像會落後,直到鏈路恢復時才會趕上。

對於需要對所有數據進行全局查看的應用,您可以使用鏡像提供集群,該集群的數據匯總從所有數據中心的本地集群中鏡像出來。這些聚合集群用於需要完整數據集的應用的讀取。

這不是唯一可能的部署模式。通過廣域網從遠程Kafka集群讀取或寫入也是可能的,不過很明顯,這會增加獲取集群所需的任何延遲。

Kafka自然地在生產者和消費者中都會對數據進行批處理,因此即使在高延遲的連接上也能實現高吞吐量。不過為了允許這樣做,可能需要使用socket.send.buffer.bytes和socket.receive.buffer.bytes配置來增加生產者、消費者和broker的TCP socket緩衝區大小。這裡記載了適當的設置方法。

一般來說,不建議在高延遲鏈路上運行跨越多個數據中心的單個Kafka集群。這將導致Kafka寫入和ZooKeeper寫入的複製延遲非常高,而且如果位置之間的網絡不可用,Kafka和ZooKeeper都不會在所有位置保持可用。

 

6.3 地理複製(跨集群數據鏡像)地理複製概述

Kafka管理員可以定義跨越單個Kafka集群、數據中心或地理區域邊界的數據流。這種事件流的設置通常是出於組織、技術或法律要求的需要。常見的場景包括

地理複製

災難恢復

將邊緣集群送入一個中央的聚合集群中

集群的物理隔離(如生產與測試)。

雲遷移或混合雲部署

法律和合規要求

管理員可以通過Kafka的MirrorMaker(第2版)來設置這樣的集群間數據流,MirrorMaker是一個在不同Kafka環境之間以流式方式複製數據的工具。MirrorMaker建立在Kafka Connect框架之上,支持以下功能。

注意:使用MirrorMaker的Geo-replication可以跨Kafka集群複製數據。這種集群間的複製與Kafka的集群內複製不同,後者在同一個Kafka集群內複製數據。

 

什麼是複製流

通過MirrorMaker,Kafka管理員可以將主題、主題配置、消費者組及其偏移量和ACL從一個或多個源Kafka集群複製到一個或多個目標Kafka集群,即跨集群環境。簡而言之,MirrorMaker 使用 Connectors 從源集群消費,並生產到目標集群。

這些從源集群到目標集群的定向流稱為複製流。它們在MirrorMaker配置文件中以{source_cluster}->{target_cluster}的格式進行定義,後面會介紹。管理員可以根據這些流創建複雜的複製拓撲。

以下是一些示例模式。

Active/Active高可用性部署。A->B,B->A

主動/被動或主動/備用高可用性部署。A->B

聚集(如從許多簇到一個簇)。A->K,B->K,C->K。

扇出(如從一簇到多簇)。K->A,K->B,K->C。

轉發:A->B、B->C、C->D A->B,B->C,C->D。

默認情況下,一個流會複製所有主題和消費者組。但是,可以對每個複製流進行獨立配置。例如,您可以定義只將特定的主題或消費者組從源群集複製到目標群集。

下面是關於如何配置數據從主群集複製到輔助群集(主動/被動設置)的第一個例子。

 

# Basic settings
clusters = primary, secondary
primary.bootstrap.servers = broker3-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092

# Define replication flows
primary->secondary.enable = true
primary->secondary.topics = foobar-topic, quux-.*

 

配置地理複製

以下部分描述了如何配置和運行專用的MirrorMaker集群。如果您想在現有的 Kafka Connect 集群或其他支持的部署設置中運行 MirrorMaker,請參考 KIP-382: MirrorMaker 2.0,並注意不同部署模式的配置設置名稱可能有所不同。

除了以下章節所涉及的內容外,還可在以下網址獲得有關配置設置的進一步示例和信息。

MirrorMakerConfig,MirrorConnectorConfig。

預設主題過濾(DefaultTopicFilter),預設組過濾(DefaultGroupFilter)。

connect-mirror-maker.properties中的配置設置示例,KIP-382。MirrorMaker 2.0

 

配置文件語法

MirrorMaker配置文件通常命名為connect-mirror-maker.properties。你可以在這個文件中配置各種組件。

例子。定義MirrorMaker設置(後面會詳細解釋)。

 

# Global settings
clusters = us-west, us-east # defines cluster aliases
us-west.bootstrap.servers = broker3-west:9092
us-east.bootstrap.servers = broker5-east:9092

topics = .* # all topics to be replicated by default

# Specific replication flow settings (here: flow from us-west to us-east)
us-west->us-east.enable = true
us-west->us.east.topics = foo.*, bar.* # override the default above

 

MirrorMaker是基於Kafka Connect框架的。在Kafka Connect的文檔章節中描述的任何Kafka Connect、source connector和sink connector設置都可以直接在MirrorMaker配置中使用,而無需更改或前綴配置設置的名稱。

例子 定義自定義的Kafka Connect設置,供MirrorMaker使用。

 

# Setting Kafka Connect defaults for MirrorMaker
tasks.max = 5

 

除了tasks.max之外,大多數默認的Kafka Connect設置在MirrorMaker開箱即用的情況下都能正常工作。為了在多個MirrorMaker進程中均勻分配工作負載,建議根據可用的硬體資源和要複製的主題分區總數,將tasks.max設置為至少2(最好更高)。

您可以進一步自定義MirrorMaker按源集群或目標集群的Kafka Connect設置(更準確地說,您可以 "按連接器 "指定Kafka Connect worker級別的配置設置)。在MirrorMaker配置文件中使用{cluster}.{config_name}的格式。

例子 :為us-west集群定義自定義連接器設置:

 

# us-west custom settings
us-west.offset.storage.topic = my-mirrormaker-offsets

 

MirrorMaker內部使用Kafka生產者、消費者和管理員客戶端。經常需要對這些客戶端進行自定義設置。要覆蓋默認值,請在MirrorMaker配置文件中使用以下格式。

{source}.consumer.{consumer_config_name}。

{target}.producer.{producer_config_name}。

{source_or_target}.admin.{admin_config_name}。

例子:定義自定義生產者、消費者、管理員客戶端的設置:

# us-west cluster (from which to consume)
us-west.consumer.isolation.level = read_committed
us-west.admin.bootstrap.servers = broker57-primary:9092

# us-east cluster (to which to produce)
us-east.producer.compression.type = gzip
us-east.producer.buffer.memory = 32768
us-east.admin.bootstrap.servers = broker8-secondary:9092

 

創建和啟用複製流

要定義複製流,必須先在MirrorMaker配置文件中定義各自的源集群和目標Kafka集群。

例子:定義兩個集群別名primary和secondary,包括其連接信息:

clusters = primary, secondary
primary.bootstrap.servers = broker10-primary:9092,broker-11-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092,broker6-secondary:9092

 

其次,您必須根據需要使用{source}->{target}.enabled = true顯式啟用單個複製流。請記住,流是有方向性的:如果你需要雙向(雙向)複製,你必須啟用兩個方向的流。

 

# Enable replication from primary to secondary
primary->secondary.enable = true

 

默認情況下,複製流將把除少數特殊主題和消費者組以外的所有主題和消費者組從源群集複製到目標群集,並自動檢測任何新創建的主題和組。在目標集群中複製的主題的名稱將以源集群的名稱為前綴(請參閱下面的進一步章節)。例如,源群集 us-west 中的主題 foo 將被複製到目標群集 us-east 中名為 us-west.foo 的主題。

後面的章節將解釋如何根據你的需要定製這個基本設置。

 

配置複製流

複製流的配置是頂層默認設置(如主題)的組合,在頂層默認設置的基礎上應用特定流的設置(如us-west->us-east.topic)。要更改頂層默認設置,請將相應的頂層設置添加到 MirrorMaker 配置文件中。要僅覆蓋特定複製流的默認值,請使用語法格式{source}->{target}.{config.name}。

最重要的設置是

topics:主題列表或定義源集群中哪些主題要複製的正則表達式(默認:topics = .*)。

topics.exclude:topic的列表或正則表達式,用於隨後排除由topic設置匹配的topic(默認:topic.exclude = .*[\-/\.]internal, .*/\.replica, __.*)

groups:主題列表或正則表達式,用於定義源集群中要複製的消費者群體(默認:group = .*)。

groups.exclude:主題列表或正則表達式,用於隨後排除由 groups 設置匹配的消費者組(默認情況下:groups.exclude = console-consumer-.*, connect-.*, __.*)。

{source}->{target}.enable:設置為true以啟用複製流(默認:false)。

例如:

# Custom top-level defaults that apply to all replication flows
topics = .*
groups = consumer-group1, consumer-group2

# Don't forget to enable a flow!
us-west->us-east.enable = true

# Custom settings for specific replication flows
us-west->us-east.topics = foo.*
us-west->us-east.groups = bar.*
us-west->us-east.emit.heartbeats = false

 

支持其他配置設置,其中一些設置列在下面。在大多數情況下,你可以讓這些設置保持默認值。更多細節請參見MirrorMakerConfig和MirrorConnectorConfig。

refresh.topics.enabled:是否定期檢查源集群中的新主題(默認:true)。

refresh.topics.interval.seconds:在源集群中檢查新主題的頻率;比默認值低的值可能會導致性能下降(默認值:6000,每十分鐘一次)。

refresh.groups.enabled:是否定期檢查源群集中的新消費群集(默認:true)。

refresh.groups.interval.seconds:在源群集中檢查新消費者群集的頻率;低於默認值可能會導致性能下降(默認值:6000,每十分鐘一次)。

sync.topic.configs.enabled:是否從源集群複製topic配置(默認:true)。

sync.topic.acls.enabled:是否從源集群同步ACL(默認:true)。

emit.heartbeats.enabled:是否周期性地發出心跳聲(默認:true)。

emit.heartbeats.interval.seconds:發出心跳的頻率(默認:5,每五秒一次)。

heartbeats.topic.replication.factor:MirrorMaker內部心跳主題的複製因子(默認:3)。

emit.checkpoints.enabled:是否定期發射MirrorMaker的消費偏移量(默認:true)。

emit.checkpoints.interval.seconds:發出檢查點的頻率(默認:60,每分鐘)。

checkpoints.topic.replication.factor:MirrorMaker內部檢查點主題的複製因子(默認:3)。

sync.group.offsets.enabled:是否定期將複製的消費群組(在源群組中)的翻譯偏移量寫入目標群組中的__consumer_offsets主題,只要該群組中沒有活躍的消費者連接到目標群組(默認:true)。

sync.group.offsets.interval.seconds:同步消費組偏移量的頻率(默認:60,每分鐘)。

offset-syncs.topic.replication.factor:MirrorMaker內部offset-syncs主題的複製因子(默認:3)

確保複製流的安全

MirrorMaker支持與Kafka Connect相同的安全設置,所以請參考連結部分了解更多信息。

例如:加密MirrorMaker和us-east集群之間的通信:

us-east.security.protocol=SSL
us-east.ssl.truststore.location=/path/to/truststore.jks
us-east.ssl.truststore.password=my-secret-password
us-east.ssl.keystore.location=/path/to/keystore.jks
us-east.ssl.keystore.password=my-secret-password
us-east.ssl.key.password=my-secret-password

自定義命名目標集群中重複的主題

在目標集群中複製的主題--有時稱為遠程主題--會根據複製策略重新命名。MirrorMaker使用該策略來確保來自不同集群的事件(也就是記錄、消息)不會被寫入同一個主題分區。默認情況下,按照DefaultReplicationPolicy,目標集群中複製的主題名稱具有{source}.{source_topic_name}的格式。

 

us-west us-east
========= =================
bar-topic
foo-topic --> us-west.foo-topic

您可以使用 replication.policy.separator 設置自定義分隔符(默認:.)

# 自定義一個分隔符
us-west->us-east.replication.policy.separator = _

如果你需要進一步控制複製的主題的命名方式,你可以實現一個自定義的ReplicationPolicy,並在MirrorMaker配置中覆蓋replication.policy.class(默認為DefaultReplicationPolicy)。

 

防止配置衝突

MirrorMaker進程通過其目標Kafka集群共享配置。當針對同一目標集群操作的MirrorMaker進程之間的配置不同時,這種行為可能會導致衝突。

例如,以下兩個MirrorMaker進程會很狂野:

# Configuration of process 1
A->B.enabled = true
A->B.topics = foo

# Configuration of process 2
A->B.enabled = true
A->B.topics = bar

在這種情況下,兩個進程將通過集群B共享配置,從而導致衝突。根據這兩個進程中哪個進程是當選的 "領導者",結果將是主題foo或主題欄被複製,但不是兩個都被複製。

因此,在複製到同一目標集群的各個複製流中保持MirrorMaker配置的一致性是非常重要的。例如,可以通過自動化工具或為整個組織使用單一的共享MirrorMaker配置文件來實現這一點。

 

最佳實踐:遠程消費,本地生產

為了最大限度地減少延遲("生產者滯後"),建議將MirrorMaker進程定位在儘可能靠近其目標集群的地方,即它生產數據的集群。這是因為Kafka生產者通常比Kafka消費者在不可靠或高延遲的網絡連接中掙扎得更多。

 

First DC Second DC
========== =========================
primary ---- MirrorMaker --> secondary
(remote) (local)

 

要運行這種 "從遠程消費,生產到本地 "的設置,請在目標集群附近,最好是在同一位置運行MirrorMaker進程,並在--clusters命令行參數中明確設置這些 "本地 "集群(以空格分隔的集群別名列表)。

 

# Run in secondary's data center, reading from the remote `primary` cluster
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary

 

--clusters secondary告訴MirrorMaker進程,給定的集群就在附近,並防止它複製數據或向其他遠程位置的集群發送配置。

 

例子:主動/被動高可用性部署 主動/被動高可用性部署

下面的例子顯示了將主題從主環境複製到輔助Kafka環境的基本設置,但不是從輔助環境複製回主環境。請注意,大多數生產設置將需要進一步的配置,例如安全設置。

 

# Unidirectional flow (one-way) from primary to secondary cluster
primary.bootstrap.servers = broker1-primary:9092
secondary.bootstrap.servers = broker2-secondary:9092

primary->secondary.enabled = true
secondary->primary.enabled = false

primary->secondary.topics = foo.* # only replicate some topics

 

示例:主動/主動高可用性部署

下面的例子顯示了在兩個集群之間以兩種方式複製主題的基本設置。請注意,大多數生產設置將需要進一步的配置,例如安全設置。

 

# Bidirectional flow (two-way) between us-west and us-east clusters
clusters = us-west, us-east
us-west.bootstrap.servers = broker1-west:9092,broker2-west:9092
Us-east.bootstrap.servers = broker3-east:9092,broker4-east:9092

us-west->us-east.enabled = true
us-east->us-west.enabled = true

關於防止複製 "循環 "的注意點(即topic將原本從A複製到B,然後被複製的topic將再次從B複製到A,以此類推)。只要在同一個MirrorMaker配置文件中定義上述流程,就不需要明確添加topic.exclude設置來防止兩個集群之間的複製循環。

 

例子: 多集群地理複製

讓我們把前面幾節的所有信息放在一個更大的例子中。想像一下,有三個數據中心(西、東、北),每個數據中心有兩個Kafka集群(例如,西-1、西-2)。本節的示例展示了如何配置MirrorMaker (1)用於每個數據中心內的Active/Active複製,以及(2)用於跨數據中心複製(XDCR)。

首先,在配置中定義源集群和目標集群以及它們的複製流:

# Basic settings
clusters: west-1, west-2, east-1, east-2, north-1, north-2
west-1.bootstrap.servers = ...
west-2.bootstrap.servers = ...
east-1.bootstrap.servers = ...
east-2.bootstrap.servers = ...
north-1.bootstrap.servers = ...
north-2.bootstrap.servers = ...

# Replication flows for Active/Active in West DC
west-1->west-2.enabled = true
west-2->west-1.enabled = true

# Replication flows for Active/Active in East DC
east-1->east-2.enabled = true
east-2->east-1.enabled = true

# Replication flows for Active/Active in North DC
north-1->north-2.enabled = true
north-2->north-1.enabled = true

# Replication flows for XDCR via west-1, east-1, north-1
west-1->east-1.enabled = true
west-1->north-1.enabled = true
east-1->west-1.enabled = true
east-1->north-1.enabled = true
north-1->west-1.enabled = true
north-1->east-1.enabled = true

 

然後,在每個數據中心,啟動一個或多個MirrorMaker,如下所示。

 

# In West DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters west-1 west-2

# In East DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters east-1 east-2

# In North DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters north-1 north-2

 

通過這種配置,向任何集群生產的記錄都將在數據中心內複製,也會跨到其他數據中心。通過提供--clusters參數,我們可以確保每個MirrorMaker進程只向附近的集群生產數據。

注意:從技術上講,這裡並不需要--clusters參數。MirrorMaker在沒有它的情況下也能正常工作。但是,吞吐量可能會受到數據中心之間 "生產者滯後 "的影響,你可能會產生不必要的數據傳輸成本。

 

開始地理複製

您可以根據需要運行儘可能少的或儘可能多的MirrorMaker進程(認為:節點、伺服器)。因為MirrorMaker是基於Kafka Connect的,所以配置為複製相同Kafka集群的MirrorMaker進程以分布式設置運行。它們會相互找到對方,共享配置(見下節),負載平衡它們的工作,等等。例如,如果你想提高複製流的吞吐量,一個選項是並行運行額外的MirrorMaker進程。

要啟動一個MirrorMaker進程,請運行命令:

 

$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

 

啟動後,可能需要幾分鐘的時間,直到MirrorMaker進程第一次開始複製數據。

如前所述,可以選擇設置參數--clusters,以確保MirrorMaker進程只向附近的集群產生數據。

 

# Note: The cluster alias us-west must be defined in the configuration file
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties \
--clusters us-west

 

測試複製消費者組時的注意事項。默認情況下,MirrorMaker不會複製kafka-console-consumer.sh工具創建的消費者組,你可能會用它在命令行上測試你的MirrorMaker設置。如果你確實也想複製這些消費者組,請相應地設置groups.exclude配置(默認:groups.exclude = console-consumer-.*,connect-.*,__.*)。完成測試後,記得再次更新配置。

 

停止地理複製

你可以通過使用命令發送SIGTERM信號來停止正在運行的MirrorMaker進程。

 

$ kill <MirrorMaker pid>

 

應用配置更改

要使配置更改生效,必須重新啟動MirrorMaker進程。

 

監測地理複製

建議監控MirrorMaker進程,以確保所有定義的複製流都能正常運行。MirrorMaker建立在Connect框架上,繼承了Connect的所有度量,比如source-record-poll-rate。此外,MirrorMaker還在kafka.connect.mirror度量組下產生自己的度量。度量被標記為以下屬性。

source:源集群的別名(例如,主集群)。

target:目標群組的別名(如二級)。

topic:在目標群組上複製的主題

partition:被複製的分區

對每個複製的主題進行度量跟蹤。可以從主題名稱推斷出源集群。例如,從primary->secondary複製topic1將產生如下指標。

target=secondary

topic=primary.topic1

partition=1


發出以下指標:

 

# MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)

record-count # number of records replicated source -> target
record-age-ms # age of records when they are replicated
record-age-ms-min
record-age-ms-max
record-age-ms-avg
replication-latency-ms # time it takes records to propagate source->target
replication-latency-ms-min
replication-latency-ms-max
replication-latency-ms-avg
byte-rate # average number of bytes/sec in replicated records

# MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)

checkpoint-latency-ms # time it takes to replicate consumer offsets
checkpoint-latency-ms-min
checkpoint-latency-ms-max
checkpoint-latency-ms-avg

 

這些指標並不區分create-at和log-append時間戳。

 

6.4 kfaka配置重要的客戶端配置

最重要的生產者配置是。

acks

compression

batch size

最重要的消費者配置是獲取大小。
所有的配置在配置部分都有記載。

生產伺服器配置

下面是一個生產伺服器配置的例子:

# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]

# Log configuration
num.partitions=8
default.replication.factor=3
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]

# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]

 

我們的客戶端配置在不同的用例之間有相當大的差異。

6.5 Java版本

支持Java 8和Java 11。如果啟用了TLS,Java 11的性能會顯著提高,所以強烈推薦使用(它還包括一些其他性能改進。G1GC、CRC32C、Compact Strings、線程本地握手等)。) 從安全角度來看,我們推薦最新發布的補丁版本,因為舊的免費版本已經披露了安全漏洞。用基於OpenJDK的Java實現(包括Oracle JDK)運行Kafka的典型配置是:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent

 

以下是LinkedIn最繁忙的一個集群(高峰期)使用上述Java參數的統計數據,供參考:

60個broker

5萬個分區(複製係數為2)

80萬條/秒的信息

入300 MB/秒,出1 GB/秒以上

該集群中的所有broker,90%的GC暫停時間約為21ms,每秒只有不到1個新生代GC。

 

6.6 硬體和作業系統

我們使用的是雙四核Intel Xeon機器,內存為24GB。
你需要足夠的內存來緩衝活躍的讀寫器。你可以通過假設你希望能夠緩衝30秒,並計算你的內存需求為write_throughput*30,對內存需求做一個包絡後的估計。

磁碟的吞吐量很重要。我們有8x7200轉的SATA硬碟。一般來說,磁碟吞吐量是性能瓶頸,磁碟越多越好。取決於你如何配置刷新行為,你可能會或可能不會受益於更昂貴的磁碟(如果你經常強制刷新,那麼更高轉速的SAS驅動器可能會更好)。

 

作業系統

Kafka應該在任何unix系統上運行良好,並且已經在Linux和Solaris上進行了測試。
我們已經看到了一些在Windows上運行的問題,Windows目前不是一個很好的支持平臺,儘管我們很樂意改變這種情況。

它不太可能需要太多的作業系統級別的調整,但是有三個潛在的重要的作業系統級別的配置。

文件描述符限制。Kafka對日誌段和開放連接使用文件描述符。如果一個broker託管了許多分區,考慮到broker至少需要(number_of_partitions)*(partition_size/segment_size)來跟蹤所有的日誌段,此外還有broker的連接數。我們建議至少為broker進程提供100000個允許的文件描述符作為起點。注意:mmap()函數為與文件描述符 fildes 相關聯的文件添加了一個額外的引用,這個引用不會被隨後對該文件描述符的 close()刪除。當沒有更多的映射到文件時,這個引用就會被刪除。

最大套接字緩衝區大小:可以增加,以實現這裡所說的數據中心之間的高性能數據傳輸。

一個進程可能擁有的內存映射區域的最大數量(又名vm.max_map_count)。參見Linux內核文檔。在考慮一個broker可能擁有的最大分區數時,你應該關注這個作業系統級別的屬性。默認情況下,在一些Linux系統上,vm.max_map_count的值是65535左右。每個分區分配的每個日誌段,都需要一對索引/時間索引文件,每個文件消耗1個地圖區域。換句話說,每個日誌段使用2個地圖區域。因此,每個分區只要承載一個日誌段,就至少需要2個地圖區域。也就是說,在一個broker上創建50000個分區將導致100000個地圖區域的分配,並且很可能導致broker在默認的vm.max_map_count系統上以OutOfMemoryError(地圖失敗)的方式崩潰。請記住,每個分區的日誌段數根據段的大小、負載強度、保留策略的不同而不同,一般來說,往往會超過一個。

磁碟和文件系統

我們建議使用多個驅動器來獲得良好的吞吐量,並且不要與應用程式日誌或其他作業系統文件系統活動共享用於Kafka數據的相同驅動器,以確保良好的延遲。你可以將這些硬碟一起RAID成一個卷,或者將每個硬碟格式化並掛載為自己的目錄。由於Kafka具有複製功能,RAID提供的冗餘也可以在應用層提供。這種選擇有幾個權衡。
如果你配置了多個數據目錄,分區將被輪流分配到數據目錄中。每個分區將完全在其中一個數據目錄中。如果數據在分區之間沒有得到很好的平衡,就會導致磁碟之間的負載不平衡。

RAID有可能在平衡磁碟之間的負載方面做得更好(儘管它似乎並不總是這樣),因為它在較低的層次上平衡負載。RAID的主要缺點是,它通常會對寫入吞吐量造成很大的性能衝擊,並減少可用的磁碟空間。

RAID的另一個潛在好處是能夠容忍磁碟故障。然而我們的經驗是,重建RAID陣列是如此的I/O密集型,以至於它有效地禁用了伺服器,所以這並沒有提供多少真正的可用性改進。

 

應用程式與作業系統刷新管理

Kafka總是立即將所有數據寫入文件系統,並支持配置刷新策略的功能,控制何時使用刷新將數據從作業系統緩存中強制寫入磁碟。這個刷新策略可以控制在一段時間後或在寫入一定數量的消息後強制將數據放到磁碟上。這個配置有幾種選擇。


Kafka最終必須調用fsync才能知道數據被刷新。當從崩潰中恢復任何不知道是fsync'd的日誌段時,Kafka將通過檢查每個消息的CRC來檢查其完整性,並且還重建附帶的偏移索引文件,作為啟動時執行的恢復過程的一部分。

請注意,Kafka中的耐久性不需要將數據同步到磁碟上,因為失敗的節點總是會從其副本中恢復。

我們建議使用默認的刷新設置,它完全禁用應用程式fsync。這意味著依靠作業系統和Kafka自己的後臺刷新來完成。這為大多數用途提供了最好的選擇:無需調整旋鈕,極大的吞吐量和延遲,以及完全的恢復保證。我們普遍覺得複製提供的保證比同步到本地磁碟更強,然而偏執狂還是可能更喜歡兩者兼備,而且仍然支持應用級fsync策略。

使用應用級刷新設置的缺點是它的磁碟使用模式效率較低(它給作業系統重新排序寫入的餘地較小),而且它可能會引入延遲,因為大多數Linux文件系統中的fsync會阻止對文件的寫入,而後臺刷新做的是更細化的頁級鎖定。

一般來說,你不需要對文件系統進行任何低級別的調整,但在接下來的幾節中,我們將介紹一些這方面的內容,以防有用。

 

了解Linux作業系統的刷新行為

在Linux中,寫入文件系統的數據被保存在pagecache中,直到必須將其寫入磁碟(由於應用程式級的fsync或作業系統自己的刷新策略)。數據的刷新是由一組稱為pdflush的後臺線程(或在2.6.32後的內核中稱為 "flusher線程")完成的。
Pdflush有一個可配置的策略來控制緩存中可以保留多少髒數據,以及在多長時間內必須將其寫回磁碟。這裡描述了這個策略。當Pdflush無法跟上數據寫入的速度時,它最終會導致寫入過程阻塞,從而在寫入過程中產生延遲,減緩數據的積累。

你可以通過執行以下操作來查看當前作業系統的內存使用狀況

 

> cat /proc/meminfo

 

這些值的含義在上面的連結中進行了描述。
與進程內緩存相比,使用pagecache來存儲將寫入磁碟的數據有幾個優勢。

 

文件系統選擇

Kafka使用的是磁碟上的常規文件,因此它對特定的文件系統沒有硬性依賴。不過,使用率最高的兩個文件系統是EXT4和XFS。從歷史上看,EXT4的使用量更大,但最近對XFS文件系統的改進表明,它對Kafka的工作負載有更好的性能特性,而且在穩定性方面沒有任何妥協。

對比測試是在一個具有重大消息負載的集群上進行的,使用各種文件系統創建和掛載選項。Kafka中被監控的主要指標是 "請求本地時間",表示追加操作所需的時間。XFS帶來了更好的本地時間(160ms vs. 最佳EXT4配置的250ms+),以及更低的平均等待時間。XFS性能也顯示出磁碟性能的變化較小。

一般文件系統注意事項

對於任何用於數據目錄的文件系統,在Linux系統上,建議在掛載時使用以下選項。

XFS文件系統注意事項

XFS文件系統有大量的自動調優功能,所以無論是在創建文件系統時還是在掛載時,都不需要改變默認設置。唯一值得考慮的調優參數是

EXT4文件系統注意事項

EXT4是Kafka數據目錄的一個服務性的文件系統選擇,然而要想獲得最大的性能,需要調整幾個掛載選項。此外,這些選項在故障情況下一般是不安全的,會導致更多的數據丟失和損壞。對於單個broker故障,這並不是什麼大問題,因為可以擦除磁碟,並從集群中重建副本。在多重故障的情況下,例如斷電,這可能意味著底層文件系統(因此也是數據)的損壞,不容易恢復。以下選項可以調整。

data=writeeback: Ext4默認為data=ordered,它對一些寫入進行了強烈的排序。Kafka不需要這個排序,因為它對所有未刷新的日誌進行非常偏執的數據恢復。這個設置去掉了排序約束,似乎可以顯著降低延遲。

禁用日誌。日誌是一種權衡:它使伺服器崩潰後的重啟速度更快,但它引入了大量額外的鎖定,增加了寫入性能的變數。那些不關心重啟時間並希望減少寫延遲峰值的主要來源的人可以完全關閉日誌。

commit=num_secs。這將調整 ext4 向元數據日誌提交的頻率。將其設置為較低的值可以減少崩潰時未刷新數據的損失。將其設置為較高的值將提高吞吐量。

nobh:當使用data=writeback模式時,該設置控制額外的排序保證。這對Kafka來說應該是安全的,因為我們不依賴於寫排序,並提高吞吐量和延遲。

delalloc。延遲分配意味著文件系統避免分配任何塊,直到物理寫發生。這允許ext4分配一個大的範圍,而不是較小的頁面,並有助於確保數據按順序寫入。這個特性對於吞吐量來說是非常好的。它似乎涉及到文件系統中的一些鎖定,這增加了一點延遲差異。

 

6.7 監測

Kafka在伺服器中使用Yammer Metrics進行度量報告。Java客戶端使用Kafka Metrics,這是一個內置的度量註冊表,可以最大限度地減少拉到客戶端應用程式中的過渡性依賴關係。兩者都通過JMX暴露度量,並且可以配置使用可插拔的統計報告器來報告統計數據,以掛接到您的監控系統。
所有的Kafka速率指標都有一個相應的累計計數指標,後綴為-total。例如,records-consumed-rate有一個對應的度量,名為records-consumed-total。

查看可用指標的最簡單方法是啟動jconsole,並將其指向正在運行的kafka客戶端或伺服器;這將允許使用JMX瀏覽所有指標。

使用JMX進行遠程監控的安全注意事項

Apache Kafka 默認情況下是禁用遠程 JMX 的。您可以通過為使用 CLI 或標準 Java 系統屬性啟動的進程設置環境變量 JMX_PORT 來啟用遠程 JMX 的遠程監控,從而以編程方式啟用遠程 JMX。在生產場景中啟用遠程 JMX 時,您必須啟用安全功能,以確保未經授權的用戶無法監視或控制您的經紀商或應用程式以及運行這些應用程式的平臺。請注意,在Kafka中,默認情況下,JMX的身份驗證是被禁用的,對於生產部署,必須通過為使用CLI啟動的進程設置環境變量KAFKA_JMX_OPTS或設置適當的Java系統屬性來覆蓋安全配置。請參閱使用JMX技術的監控和管理,了解有關JMX安全的詳細信息。


我們對以下指標進行圖形化和警報:

 

描述指標名稱值域消息速率kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
客戶端消息速率kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
從其他broker同步消息速率kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
請求速率kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower}
錯誤速率kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+)按請求類型和錯誤代碼計算的響應中的錯誤數量。如果一個響應包含多個錯誤,則會計算所有錯誤。error=NONE表示成功的響應。請求大小kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+)每個請求類型的請求大小臨時內存大小kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce|Fetch}

用於消息格式轉換和解壓的臨時內存

消息格式轉換時間kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|Fetch}

信息格式轉換所花費的時間,以毫秒為單位。

消息格式轉換速率kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch}MessageConversionsPerSec,topic=([-.\w]+)

需要轉換電文格式的記錄數量。

請求隊列大小kafka.network:type=RequestChannel,name=RequestQueueSize請求隊列大小客戶端的字節輸出速率kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
發送到其他broker字節輸出速率kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
由於沒有為壓縮主題指定密鑰而導致的消息驗證失敗率kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec
因無效魔數導致的信息驗證失敗率kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec
由於錯誤的crc校驗和而導致的信息驗證失敗率kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec
由於批次中的非連續偏移或序列號導致的信息驗證失敗率kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec
日誌刷新速度和時間kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
# 複製不足的分區數(未重新分配的副本數-ISR數>0)kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions0# 小於最小Isr分區的數量(|ISR| < min.insync.replicas)kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount0# 等於最小Isr分區的數量 (|ISR| = min.insync.replicas)kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount0# 離線的日誌目錄kafka.log:type=LogManager,name=OfflineLogDirectoryCount0broker的控制器是否活躍kafka.controller:type=KafkaController,name=ActiveControllerCount

在集群中,應該有一個broker應該是1

選舉leader的速率kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs當出現broker故障時,非零不乾淨的選舉leader的速率kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec0待刪除的topic的數量kafka.controller:type=KafkaController,name=TopicsToDeleteCount
待刪除的分區數量kafka.controller:type=KafkaController,name=ReplicasToDeleteCount
待刪除的不合格的topic的數量kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount
待刪除的不合格的分區數量kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount
分區計數kafka.server:type=ReplicaManager,name=PartitionCount通常是奇數Leader副本計數kafka.server:type=ReplicaManager,name=LeaderCount通常是奇數ISR 收縮速率kafka.server:type=ReplicaManager,name=IsrShrinksPerSec

如果一個broker宕機,部分分區的ISR會縮減。當該broker再次啟動時,一旦副本被完全趕上,ISR將被擴大。除此之外,ISR收縮率和擴張率的預期值都是0。

ISR 擴展速率kafka.server:type=ReplicaManager,name=IsrExpandsPerSec參考上面追隨者和領導者副本之間的消息最大滯後性kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica

滯後時間應與最大的一個生產者請求大小成正比。

每個追隨者副本的消息滯後kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

滯後時間應與最大的一個生產者請求大小成正比。

在生產者發送隊列中等待的請求數量kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce如果使用ack=-1,則為非零。在消費者隊列中等待的請求kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch

大小取決於消費者中的fetch.wait.max.ms。

請求總計用時kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower}

分為隊列、本地、遠程和響應發送時間。

請求在請求隊列中的等待時間kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}
請求在leader處處理的時間kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower}
請求等待follower的時間kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower}

當ACK=-1時,生產請求的非零。

請求在響應隊列中的等待時間kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower}
發出答覆的時間kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower}
消費者落後於生產者的信息數量。由消費者發布,而非經紀人發布。kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} Attribute: records-lag-max
網絡處理器空閒的平均時長。kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent

在0和1之間,理想的情況是>0.3。

由於客戶機沒有重新認證,然後使用超過到期時間的連接進行重新認證以外的其他操作而在處理器上斷開的連接數。kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count

理想情況下,當重新認證被啟用時,意味著不再有任何舊的、2.2.0之前的客戶端連接到這個(監聽器、處理器)組合上。

由於客戶機沒有重新認證,然後在過期後將連接用於除重新認證以外的其他用途而斷開的連接總數,跨越所有處理器。kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount

理想情況下,當重新認證被啟用時,意味著不再有任何舊的、2.2.0之前的客戶端連接到這個broker。

請求處理程序線程空閒的平均時長。kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent

在0和1之間,理想的情況是>0.3。

每個(用戶、客戶端ID)、用戶或客戶端ID的帶寬配額指標kafka.server:type={Produce|Fetch},user=([-.\w]+),client-id=([-.\w]+)

兩個屬性。throttle-time表示客戶端被節流的時間,單位為ms。理想情況下=0.byte-rate表示客戶端的數據產生/消耗率,單位為字節/秒。對於(user,client-id)配額,用戶和client-id都要指定。如果對客戶端應用了per-client-id配額,則不指定用戶。如果應用了per-user配額,則不指定client-id。

按(用戶、客戶機ID)、用戶或客戶機ID申請配額指標。kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+)

兩個屬性.throttle-time表示客戶端被節流的時間,單位為ms。理想情況下=0.request-time表示在broker網絡和I/O線程中處理來自客戶端組的請求所花費的時間百分比。對於(user,client-id)配額,用戶和client-id都要指定。如果對客戶端應用了per-client-id配額,則不指定用戶。如果應用了per-user配額,則不指定client-id。

免於節流的請求kafka.server:type=Request

exempt-throttle-time表示經紀網絡和I/O線程處理免於節流的請求所花費的時間百分比。

ZooKeeper客戶端請求延遲kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs

從broker發出ZooKeeper請求的延遲,以毫秒計。

ZooKeeper連接狀態kafka.server:type=SessionExpireListener,name=SessionState

經紀人ZooKeeper會話的連接狀態可能是Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired。

加載組元數據的最大時間kafka.server:type=group-coordinator-metrics,name=partition-load-time-max

從過去30秒內加載的消費者偏移分區加載偏移量和組元數據所花費的最大時間(包括等待加載任務被安排的時間),以毫秒為單位。

加載組元數據的平均時間kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg

過去30秒內從消費者偏移分區加載偏移量和組元數據所花費的平均時間(包括等待加載任務被安排的時間),單位為毫秒。

加載交易元數據的最大時間kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max

從過去30秒內加載的消費者偏移分區加載事務元數據所花費的最大時間(包括等待加載任務被安排的時間),單位為毫秒。

載入交易元數據的平均時間kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg

從過去30秒內加載的消費者偏移分區中加載事務元數據所需的平均時間(包括等待加載任務被安排的時間),以毫秒為單位。

消費者組偏離量統計kafka.server:type=GroupMetadataManager,name=NumOffsets

消費者組提交的偏移量總數

消費者組總數kafka.server:type=GroupMetadataManager,name=NumGroups消費者組的總數消費者組總數,每種狀態kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead]處於不同狀態的消費者組總數。PreparingRebalance, CompletingRebalance, Empty, Stable, Dead重新分配分區的數量kafka.server:type=ReplicaManager,name=ReassigningPartitions每臺broker機器上正在重新分配的leader分區的數量重新分配流量的出站字節率kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec
重新分配流量的傳入字節率kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec
生產者/消費者/連接/流的共同監測指標以下指標在生產者/消費者/連接者/流實例中可用。具體的指標,請看以下章節。度量值描述度量名稱connection-close-rate窗口中每秒鐘關閉的連接數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)connection-close-total窗口中關閉的連接總數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)connection-creation-rate窗口中每秒鐘建立的新連接。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)connection-creation-total窗口中建立的新連接總數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)network-io-rate每秒鐘對所有連接的平均網絡操作次數(讀或寫)。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)network-io-total所有連接上的網絡操作總數(讀或寫)。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)outgoing-byte-rate每秒鐘向所有伺服器發送的平均外發字節數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)outgoing-byte-total發送給所有伺服器的總字節數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)request-rate平均每秒發出的請求數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)request-total發送的請求總數;kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)request-size-avg窗口中所有請求的平均大小。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)request-size-max窗口中發送的任何請求的最大尺寸。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)incoming-byte-rate從所有socket上讀取的字節/秒。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)incoming-byte-total從所有套接字讀取的總字節數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)response-rate每秒鐘收到的答覆。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)response-total收到的答覆總數kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)select-rateI/O層每秒檢查新I/O執行的次數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)select-totalI/O層檢查新I/O執行的總次數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)io-wait-time-ns-avgI/O線程等待一個準備好讀或寫的socket的平均時間,單位為納秒。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)io-wait-ratioI/O線程等待的時間分數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)io-time-ns-avg每次選擇調用的平均I/O時間長度,單位為納秒。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)io-ratioI/O線程做I/O的時間分數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)connection-count當前的活動連接數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)successful-authentication-rate每秒鐘使用SASL或SSL成功認證的連接。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)successful-authentication-total使用SASL或SSL成功認證的連接總數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)failed-authentication-rate認證失敗的每秒連接數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)failed-authentication-total未能通過認證的連接總數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)successful-reauthentication-rate每秒鐘使用SASL成功重新認證的連接。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)successful-reauthentication-total使用SASL成功重新認證的連接總數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)reauthentication-latency-max觀察到的因重新認證而產生的最大延遲,單位為毫秒。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)reauthentication-latency-avg觀察到的平均延遲時間,以毫秒為單位,由於重新認證。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)failed-reauthentication-rate重新認證失敗的每秒連接數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)failed-reauthentication-total未能重新認證的連接總數。kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)successful-authentication-no-reauth-total不支持重新認證的2.2.0之前的舊版SASL客戶端成功認證的連接總數。只能為非零kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)每個broker的指標 生產者/消費者/連接/流以下指標在生產者/消費者/連接者/流實例中可用。具體的指標,請看以下章節。屬性名稱描述MBEAN NAMEoutgoing-byte-rate節點平均每秒發送的外發字節數。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)outgoing-byte-total一個節點發送出去的總字節數。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)request-rate一個節點平均每秒發送的請求數。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)request-total為一個節點發送的總請求數。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)request-size-avg一個節點的窗口中所有請求的平均大小。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)request-size-max一個節點在窗口中發送的任何請求的最大尺寸。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)incoming-byte-rate節點每秒收到的平均字節數。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)incoming-byte-total節點收到的總字節數。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)request-latency-avg節點的平均請求延遲,單位為ms。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)request-latency-max節點的最大請求延遲,單位為ms。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)response-rate節點每秒收到的響應。kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+)response-total一個節點收到的答覆總數kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) 生產者監測

生產者實例有以下指標。

屬性名稱描述MBEAN NAMEwaiting-threads等待緩衝區內存查詢記錄的用戶線程阻塞的數量。kafka.producer:type=producer-metrics,client-id=([-.\w]+)buffer-total-bytes客戶端可以使用的最大緩衝區內存量(無論當前是否使用)。kafka.producer:type=producer-metrics,client-id=([-.\w]+)buffer-available-bytes未被使用的緩衝區內存總量(未分配或在空閒列表中)。kafka.producer:type=producer-metrics,client-id=([-.\w]+)bufferpool-wait-time應用者等待空間分配的時間分數。kafka.producer:type=producer-metrics,client-id=([-.\w]+)生產者sender線程指標kafka.producer:type=producer-metrics,client-id="{client-id}" 屬性名稱描述
batch-size-avg

每個分區每次請求發送的平均字節數。


batch-size-max

每個分區每次請求發送的最大字節數。


batch-split-rate

每秒平均分批次數


batch-split-total

批次分割的總數量


compression-rate-avg

記錄批次的平均壓縮率,定義為壓縮批次大小與未壓縮大小的平均比率。


metadata-age

當前正在使用的生產者元數據的年齡,以秒為單位。


produce-throttle-time-avg

請求被broker節流的平均時間,以毫秒計


produce-throttle-time-maxbroker對請求進行節流的最大時間,以毫秒為單位。
record-error-rate

平均每秒鐘發送的導致錯誤的記錄數量。


record-error-total

導致錯誤的記錄發送總數。


record-queue-time-avg

以ms為單位的記錄批次在發送緩衝區中花費的平均時間。


record-queue-time-max

記錄批次在發送緩衝區中花費的最大時間,單位為ms。


record-retry-rate

平均每秒鐘的重試記錄發送次數。


record-retry-total

重試記錄發送的總次數


record-send-rate

平均每秒發送的記錄數。


record-send-total

發送的記錄總數。


record-size-avg平均記錄大小
record-size-max

最大記錄大小


records-per-request-avg

每次請求的平均記錄數量;


request-latency-avg

平均請求延遲時間(ms)


request-latency-max

最大請求延遲,單位為ms


requests-in-flight

目前等待答覆的飛行中請求的數量。

kafka.producer:type=producer-topic-metrics,client-id="{client-id}",topic="{topic}" 屬性名稱描述
byte-rate

一個主題平均每秒發送的字節數。


byte-total

一個主題的總發送字節數。


compression-rate

一個主題的記錄批次的平均壓縮率,定義為壓縮後的批次大小與未壓縮大小的平均比例。


record-error-rate

每秒鐘平均發送記錄的次數,導致一個主題出現錯誤。


record-error-total

導致一個主題出錯的記錄發送總數。


record-retry-rate

一個主題的平均每秒鐘重試記錄發送次數。


record-retry-total

一個主題的重試記錄發送總數。


record-send-rate

一個主題平均每秒發送的記錄數。


record-send-total一個主題的總發送記錄數。 消費者監測

消費者實例有以下指標:

屬性名稱描述MBEAN NAMEtime-between-poll-avgpoll()調用之間的平均延遲。kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)time-between-poll-maxpoll()調用之間的最大延遲。kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)last-poll-seconds-ago上次調用poll()後的秒數。kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)poll-idle-ratio-avg消費者的poll()相對於等待用戶代碼處理記錄的平均空閒時間的分數。kafka.consumer:type=consumer-metrics,client-id=([-.\w]+)消費者組指標屬性名稱描述MBEAN NAMEcommit-latency-avg提交請求的平均時間kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)commit-latency-max提交請求所需的最大時間kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)commit-rate每秒鐘的提交次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)commit-total提交總次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)assigned-partitions當前分配給該消費者的分區數量。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)heartbeat-response-time-max接收心跳請求響應所需的最大時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)heartbeat-rate平均每秒心跳次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)heartbeat-total心跳的總次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)join-time-avg小組重新加入的平均時間kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)join-time-max小組重新加入所需的最長時間kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)join-rate每秒鐘加入的組數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)join-total消費者組加盟的總機器數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)sync-time-avg組同步的平均時間kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)sync-time-max組同步的最大時間kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)sync-rate每秒鐘的組同步次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)sync-total群體同步的總次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)rebalance-latency-avg組內重新平衡所需的平均時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)rebalance-latency-max組內重新平衡所需的最大時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)rebalance-latency-total迄今為止,消費者組重新平衡所需的總時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)rebalance-total消費者組再平衡總數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)rebalance-rate-per-hour每小時再平衡次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)failed-rebalance-total失敗的消費者組再平衡次數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)failed-rebalance-rate-per-hour每小時失敗的再平衡總數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)last-rebalance-seconds-ago最後一次發生再平衡過去的時間 秒數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)last-heartbeat-seconds-ago最後一次收到控制器的心跳信號過去的時間 秒數kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)partitions-revoked-latency-avg由on-partitions-revoked rebalance監聽器回調的平均時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)partitions-revoked-latency-max由on-partitions-revoked rebalance監聽器回調的最大時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)partitions-assigned-latency-avg分區分配的重新平衡監聽器回調所需的平均時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)partitions-assigned-latency-max分區分配的重新平衡監聽器回調所需的最大時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)partitions-lost-latency-avg分區丟失的重新平衡監聽器回調的平均時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)partitions-lost-latency-maxon-partitions-lost rebalance監聽器回調的最大時間。kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+)消費者拉取性能kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}" 屬性名稱描述
bytes-consumed-rate

每秒鐘消耗的平均字節數。


bytes-consumed-total

消耗的總字節數


fetch-latency-avg拉取一個請求的平均時間
fetch-latency-max拉取一個請求最大時間
fetch-rate每秒鐘拉取的請求的數量
fetch-size-avg每秒鐘拉取的請求的平均大小 字節數
fetch-size-max

每次請求獲取的最大字節數。


fetch-throttle-time-avg

平均節流時間(毫秒)


fetch-throttle-time-max

最大節流時間(ms)


fetch-total

獲取請求的總次數。


records-consumed-rate

每秒鐘消耗的平均記錄數


records-consumed-total

消耗的記錄總數


records-lag-max

該窗口中任何分區的記錄數的最大滯後量。


records-lead-min

該窗口中任何分區的記錄數的最小前導值。


records-per-request-avg

每次請求的平均記錄數量

kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",topic="{topic}" 屬性名稱描述
bytes-consumed-rate

一個主題每秒消耗的平均字節數。


bytes-consumed-total

一個主題消耗的總字節數。


fetch-size-avg

一個主題每次請求所獲取的平均字節數。


fetch-size-max

一個主題每次請求獲取的最大字節數。


records-consumed-rate

某一主題每秒消耗的平均記錄數。


records-consumed-total

某一主題所消耗的記錄總數。


records-per-request-avg

每項請求的平均記錄數。

kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}" 屬性名稱描述
preferred-read-replica

分區的當前讀取副本,如果從領導讀取,則為-1。


records-lag

分區的最新滯後時間


records-lag-avg

分區的平均滯後時間


records-lag-max分區的最大滯後時間
records-lead分區的領先程度
records-lead-avg分區的平均領先幅度
records-lead-min分割的最小領先幅度 連接監測

一個Connect worker流程包含所有生產者和消費者的度量,以及Connect特有的度量。Worker進程本身有許多指標,而每個連接器和任務都有額外的指標。

[2020-12-04 10:03:05,630] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)

[2020-12-04 10:03:05,632] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)

kafka.connect:type=connect-worker-metrics 屬性名稱描述
connector-count

該worker中運行的連接器數量。


connector-startup-attempts-total

該worker嘗試過的連接器啟動總數。


connector-startup-failure-percentage

這個worker的連接器啟動失敗的平均比例。


connector-startup-failure-total

失敗的連接器啟動總數。


connector-startup-success-percentage

這個worker的連接器啟動成功的平均百分比。


connector-startup-success-total

成功的連接器啟動總數。


task-count

該worker運行的任務數量。


task-startup-attempts-total

該worker嘗試過的任務啟動總數。


task-startup-failure-percentage

這個worker的任務啟動失敗的平均百分比。


task-startup-failure-total

任務啟動失敗的總次數。


task-startup-success-percentage

這個worker的任務開始成功的平均百分比。


task-startup-success-total

成功的任務啟動總數。

kafka.connect:type=connect-worker-metrics,connector="{connector}" 屬性名稱描述
connector-destroyed-task-count

worker身上連接器的銷毀任務數量。


connector-failed-task-count

worker身上的連接器的失敗任務數。


connector-paused-task-count

worker上連接器的暫停任務數。


connector-running-task-count

worker上連接器的運行任務數。


connector-total-task-count

worker上的連接器的任務數量。


connector-unassigned-task-count

worker上的連接器的未分配任務數。

kafka.connect:type=connect-worker-rebalance-metrics 屬性名稱描述
completed-rebalances-total

worker完成的再平衡總數。


connect-protocol

該集群使用的Connect協議


epoch

worker的時代或代數。


leader-name

組leader的名字。


rebalance-avg-time-ms

worker重新平衡所花費的平均時間(毫秒)。


rebalance-max-time-ms

worker重新平衡所花費的最大時間(毫秒)。


rebalancing

worker目前是否在重新平衡。


time-since-last-rebalance-ms

worker完成最近一次重新平衡後的時間,以毫秒為單位。

kafka.connect:type=connector-metrics,connector="{connector}" 屬性名稱描述
connector-class

連接器類的名稱。


connector-type

連接器的類型。"source "或 "sink "之一。


connector-version

連接器類的版本,由連接器報告。


status

連接器的狀態。"未分配"、"運行"、"暫停"、"失敗 "或 "銷毀 "中的一種。

kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}" 屬性名稱描述
batch-size-avg

連接器處理的平均批次大小。


batch-size-max

連接器處理的最大批次大小。


offset-commit-avg-time-ms

該任務提交偏移量所需的平均時間(毫秒)。


offset-commit-failure-percentage

該任務的偏移提交嘗試失敗的平均百分比。


offset-commit-max-time-ms

該任務提交偏移量所需的最大時間(毫秒)。


offset-commit-success-percentage

該任務的偏移提交嘗試成功的平均百分比。


pause-ratio

該任務在暫停狀態下所花費的時間分數。


running-ratio

該任務在運行狀態下所花費的時間分數。


status

連接器任務的狀態。未分配"、"運行"、"暫停"、"失敗 "或 "銷毀 "中的一種。

kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}" 屬性名稱描述
offset-commit-completion-rate

平均每秒鐘成功完成的偏移提交次數。


offset-commit-completion-total

成功完成的偏移提交總數。


offset-commit-seq-no

當前偏移提交的序列號。


offset-commit-skip-rate

平均每秒鐘收到的太晚和跳過/忽略的偏移提交完成數。


offset-commit-skip-total

太晚收到的和跳過/忽略的偏移提交完成總數。


partition-count

分配給該任務的主題分區的數量,屬於該worker中命名的sink連接器。


put-batch-avg-time-ms

這個任務把一批匯記錄的平均時間。


put-batch-max-time-ms

這個任務放一批匯記錄所需的最大時間。


sink-record-active-count

已從Kafka讀取但尚未完全提交/刷新/被sink任務認可的記錄數量。


sink-record-active-count-avg

已從Kafka讀取但尚未完全提交/刷新/被sink任務認可的記錄的平均數量。


sink-record-active-count-max

從Kafka讀取但尚未完全提交/刷新/被sink任務認可的記錄的最大數量。


sink-record-lag-max

對於任何主題分區而言,匯任務落後於消費者位置的記錄數的最大滯後。


sink-record-read-rate

此任務從 Kafka 讀取的平均每秒記錄數,屬於此 worker 中命名的 sink 連接器。這是在應用轉換之前。


sink-record-read-total

自該任務上次重啟以來,該任務從Kafka讀取的屬於該worker中命名的sink連接器的記錄總數。


sink-record-send-rate

從轉換中輸出並發送到該任務的記錄的平均每秒數量,這些記錄屬於該工作者中指定的匯接器。這是在應用了轉換之後的數據,不包括任何被轉換過濾掉的記錄。


sink-record-send-total

自任務最後一次重啟以來,從變換中輸出並發送給該任務的屬於該worker中命名的sink連接器的記錄總數。

kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}" 屬性名稱描述
poll-batch-avg-time-ms

該任務輪詢一批源記錄所需的平均時間(毫秒)。


poll-batch-max-time-ms

該任務輪詢一批源記錄所需的最大時間(毫秒)。


source-record-active-count

這個任務已經產生但還沒有完全寫入Kafka的記錄數量。


source-record-active-count-avg

該任務已產生但尚未完全寫入Kafka的平均記錄數。


source-record-active-count-max

該任務已產生但尚未完全寫入Kafka的最大記錄數。


source-record-poll-rate

此任務在此工作者中屬於命名的源連接器的平均每秒鐘產生/輪詢(轉換前)的記錄數。


source-record-poll-total

該任務產生/輪詢的記錄總數(轉換前),屬於該工作者中的指定源連接器。


source-record-write-rate

從轉換中輸出並寫入 Kafka 的記錄的平均每秒記錄數,該任務屬於該 Worker 中的命名源連接器。這是在應用了轉換之後的數據,不包括任何被轉換過濾掉的記錄。


source-record-write-total

自任務最後一次重啟以來,該任務從變換中輸出並寫入Kafka的記錄數量,這些記錄屬於該worker中命名的源連接器。

kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}" 屬性名稱描述
deadletterqueue-produce-failures

向掛掉隊列寫入失敗的次數。


deadletterqueue-produce-requests

試圖寫入掛掉的隊列的次數。


last-error-timestamp

該任務最後一次遇到錯誤時的時間戳。


total-errors-logged

記錄的錯誤數量。


total-record-errors

本任務中記錄處理錯誤的次數。


total-record-failures

該任務中記錄處理失敗的次數。


total-records-skipped

由於錯誤而跳過的記錄數量。


total-retries重試的操作次數。 流監測

一個Kafka Streams實例包含所有的生產者和消費者指標,以及Streams特有的附加指標。默認情況下,Kafka Streams的度量有兩個記錄級別:debug和info。

需要注意的是,度量有一個4層的層次結構。在頂層,有每個啟動的Kafka Streams客戶端的客戶端級指標。每個客戶端都有流線程,有自己的度量。每個流線程有任務,有自己的指標。每個任務有若干個處理器節點,有自己的指標。每個任務也有許多狀態存儲和記錄緩存,都有自己的度量。

使用下面的配置選項來指定你要收集哪些度量。

metrics.recording.level="info"

 客戶端指標

以下所有的指標都有一個記錄級別的信息。

屬性名稱描述MBEAN NAMEversionKafka Streams客戶端的版本。kafka.streams:type=stream-metrics,client-id=([-.\w]+)commit-idKafka Streams客戶端的版本控制提交ID。kafka.streams:type=stream-metrics,client-id=([-.\w]+)application-idKafka Streams客戶端的應用ID。kafka.streams:type=stream-metrics,client-id=([-.\w]+)topology-descriptionKafka Streams客戶端中執行的拓撲的描述。kafka.streams:type=stream-metrics,client-id=([-.\w]+)stateKafka Streams客戶端的狀態。kafka.streams:type=stream-metrics,client-id=([-.\w]+)線程指標

以下所有的指標都有一個記錄級別的信息。

指標名稱描述MBEAN NAMEcommit-latency-avg該線程所有運行任務的平均執行時間,以毫秒為單位。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)commit-latency-max該線程所有正在運行的任務中,提交的最大執行時間,單位為ms。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)poll-latency-avg平均執行時間,單位為毫秒,用於消費者投票。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)poll-latency-max最大執行時間,以ms為單位,用於消費者輪詢。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)process-latency-avg平均執行時間,以毫秒為單位,進行處理。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)process-latency-max最大執行時間,單位為ms,進行處理。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)punctuate-latency-avg平均執行時間,單位為毫秒,用於標點符號。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)punctuate-latency-max標點符號的最大執行時間,單位為ms。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)commit-rate平均每秒提交的次數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)commit-total提交的總次數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)poll-rate平均每秒鐘的消費者poll調用數量。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)poll-total消費者poll調用的總數量。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)process-rate平均每秒處理的記錄數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)process-total處理的記錄總數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)punctuate-rate平均每秒鐘的標點符號調用次數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)punctuate-total標點符號調用的總次數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)task-created-rate平均每秒創建的任務數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)task-created-total創建的任務總數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)task-closed-rate平均每秒關閉的任務數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)task-closed-total結束的任務總數。kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+)任務指標

除了drop-records-rate和drop-records-total這兩個指標的記錄級別為info之外,以下所有指標的記錄級別為debug。

指標名稱描述MBEAN NAMEprocess-latency-avg平均執行時間以ns為單位,進行處理。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)process-latency-max處理的最大執行時間,以ns為單位。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)process-rate該任務的所有源處理器節點每秒處理的平均記錄數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)process-total該任務的所有源處理器節點的處理記錄總數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)commit-latency-avg提交的平均執行時間,以ns為單位。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)commit-latency-max提交的最大執行時間,單位為ns。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)commit-rate平均每秒的提交調用次數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)commit-total提交commit的總次數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)record-lateness-avg記錄的平均觀察延遲時間(流時間-記錄時間戳)。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)record-lateness-max觀察到的最大記錄延遲(流時間-記錄時間戳)。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)enforced-processing-rate平均每秒的強制處理次數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)enforced-processing-total強制處理的總數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)dropped-records-rate本任務內平均掉落的記錄數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)dropped-records-total本任務中放棄的記錄總數。kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)節點指標處理器

以下指標僅在某些類型的節點上可用,即進程-速率和進程-總數僅在源處理器節點上可用,壓制-發射-速率和壓制-發射-總數僅在壓制操作節點上可用。所有的指標都有一個記錄級別的調試。

指標名稱描述MBEAN NAMEprocess-rate源處理器節點平均每秒處理的記錄數。kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)process-total源處理器節點每秒處理的記錄總數。kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)suppression-emit-rate從抑制操作節點下遊發出的記錄的速度。kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)suppression-emit-total從抑制操作節點下遊發出的記錄總數。kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+)狀態存儲指標

以下所有指標的記錄級別均為debug。需要注意的是,對於用戶自定義的狀態存儲,存儲範圍值在StoreSupplier#metricsScope()中指定;對於內置的狀態存儲,目前我們有。

 

in-memory-state

in-memory-lru-state

in-memory-window-state

in-memory-suppression (for suppression buffers)

rocksdb-state (for RocksDB backed key-value store)

rocksdb-window-state (for RocksDB backed window store)

rocksdb-session-state (for RocksDB backed session store)

 

壓制緩衝區大小-平均值、壓制緩衝區大小-最大值、壓制緩衝區計數-平均值和壓制緩衝區計數-最大值等指標只適用於壓制緩衝區。所有其他指標都不適用於壓制緩衝區。

指標名稱描述MBEAN NAMEput-latency-avg平均執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-latency-max最大執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-if-absent-latency-avg平均put-if-absent執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-if-absent-latency-max最大的put-if-absent執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)get-latency-avg平均獲取執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)get-latency-max最大獲取執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)delete-latency-avg平均刪除執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)delete-latency-max最大刪除執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-all-latency-avg平均put-all執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-all-latency-max最大put-all執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)all-latency-avg所有操作的平均執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)all-latency-max所有操作的最大執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)range-latency-avg平均範圍執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)range-latency-max最大範圍執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)flush-latency-avg平均刷新執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)flush-latency-max最大刷新執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)restore-latency-avg平均還原執行時間(ns)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)restore-latency-max最大還原執行時間,單位為ns。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-rate這個存儲的平均投放率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-if-absent-rate這個存儲的平均放如果率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)get-rate這個存儲的平均獲取速率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)delete-rate這個存儲的平均刪除速率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)put-all-rate這個存儲的平均put-all速率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)all-rate這個存儲的所有操作的平均速率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)range-rate該存儲的平均範圍速率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)flush-rate這個存儲的平均衝洗率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)restore-rate這個存儲的平均還原率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)suppression-buffer-size-avg採樣窗口中緩衝數據的平均總大小,以字節為單位。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)suppression-buffer-size-max採樣窗口中緩衝數據的最大總大小,以字節為單位。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)suppression-buffer-count-avg採樣窗口中緩衝的平均記錄數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+)suppression-buffer-count-max採樣窗口中緩衝的最大記錄數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) RocksDB指標

RocksDB的度量分為基於統計的度量和基於屬性的度量。前者記錄的是RocksDB狀態存儲收集的統計數據,而後者記錄的是RocksDB公開的屬性。RocksDB收集的統計數據提供了一段時間內的累積測量值,例如寫入狀態存儲的字節數。RocksDB暴露的屬性提供了當前的測量值,例如,當前使用的內存量。請注意,目前內置的RocksDB狀態存儲的存儲範圍如下。

rocksdb-state (for RocksDB backed key-value store)

rocksdb-window-state (for RocksDB backed window store)

rocksdb-session-state (for RocksDB backed session store)


基於RocksDB統計的指標。以下所有基於統計的指標都有一個記錄級別的調試,因為在RocksDB中收集統計數據可能會對性能產生影響。基於統計的指標每分鐘都會從RocksDB狀態存儲中收集。如果一個狀態存儲由多個RocksDB實例組成,就像隨著時間和會話窗口的聚合一樣,每個度量都會報告狀態存儲的RocksDB實例的聚合情況。

指標名稱描述MBEAN NAMEbytes-written-rate平均每秒寫入RocksDB狀態存儲的字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)bytes-written-total寫入RocksDB狀態存儲的總字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)bytes-read-rate平均每秒從RocksDB狀態存儲中讀取的字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)bytes-read-total從RocksDB狀態存儲中讀取的總字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)memtable-bytes-flushed-rate平均每秒從內存表到磁碟上刷新的字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)memtable-bytes-flushed-total從memtable刷新到磁碟的總字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)memtable-hit-ratio相對於所有查找到的memtable來說,memtable的點擊率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)block-cache-data-hit-ratio數據塊的塊緩存點擊率相對於數據塊的所有查找到塊緩存的比率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)block-cache-index-hit-ratio索引塊的塊緩存點擊率相對於索引塊的所有查找到塊緩存的比率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)block-cache-filter-hit-ratio過濾器塊的塊緩存點擊率相對於過濾器塊的所有查找到塊緩存的比率。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)write-stall-duration-avg寫入停頓的平均持續時間(毫秒)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)write-stall-duration-total寫入停頓的總時間,單位為ms。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)bytes-read-compaction-rate壓實過程中平均每秒讀取的字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)bytes-written-compaction-rate壓實過程中平均每秒寫入的字節數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)number-open-files當前打開的文件數量。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)number-file-errors-total文件錯誤發生的總次數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)

RocksDB基於屬性的度量:以下所有基於屬性的度量都有記錄級別的信息,並在度量被訪問時被記錄。如果一個狀態存儲由多個RocksDB實例組成,就像隨著時間和會話窗口的聚合一樣,每個度量都會報告該狀態存儲的所有RocksDB實例的總和,除了塊緩存度量block-cache-*。如果每個實例都使用自己的塊緩存,則塊緩存度量報告所有RocksDB實例的總和,如果所有實例共享一個塊緩存,則只報告一個實例的記錄值。

指標名稱描述MBEAN NAMEnum-immutable-mem-table還沒有刷新的不可更改的記憶表的數量。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)cur-size-active-mem-table活動內存表的大概大小,以字節為單位。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)cur-size-all-mem-tables活動的和未刷新的不可更改的內存表的大致大小,以字節為單位。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)size-all-mem-tables活動的、未刷新的不可變的和釘住的不可變的memtables的大概大小,單位是字節。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-entries-active-mem-table活動記憶表中的條目數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-entries-imm-mem-tables未刷新的不可更改記憶表的條目數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-deletes-active-mem-table活動記憶表的刪除條數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-deletes-imm-mem-tables未刷新的不可更改記憶表的刪除條目數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)mem-table-flush-pending如果記憶表刷新待定,該指標報告1,否則報告0。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-running-flushes當前正在運行的衝洗總數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)compaction-pending如果至少有一次壓實正在進行,則該指標報告1,否則報告0。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-running-compactions當前運行的壓實次數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)estimate-pending-compaction-bytes估計壓實需要在磁碟上重寫的總字節數,以使所有層級降到目標大小以下(僅對層級壓實有效)。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)total-sst-files-size所有SST文件的總大小,以字節為單位。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)live-sst-files-size屬於最新LSM樹的所有SST文件的總大小,以字節為單位。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)num-live-versionsLSM樹的實際版本數量。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)block-cache-capacity塊狀緩存的容量,單位為字節。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)block-cache-usage駐留在塊緩存中的內存大小,單位為字節。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)block-cache-pinned-usage塊緩存中被釘入的條目的內存大小,單位為字節。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)estimate-num-keys激活的和未刷新的不可更改的記憶表和存儲中的估計密鑰數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)estimate-table-readers-mem讀取SST表所使用的估計內存,以字節為單位,不包括塊緩存中使用的內存。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)background-errors背景錯誤的總數。kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) 記錄緩存指標4

以下所有指標的記錄級別均為調試。

指標名稱描述MBEAN NAMEhit-ratio-avg平均緩存命中率定義為緩存讀取命中量與緩存讀取請求總量的比率。kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)hit-ratio-min最小緩存命中率。kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)hit-ratio-max最大緩存命中率。kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) 其他

我們建議監控GC時間等統計和各種伺服器統計,如CPU利用率、I/O服務時間等。在客戶端,我們建議監控消息/字節速率(全局和每個主題)、請求速率/大小/時間,在消費端,監控所有分區間消息的最大滯後和最小取回請求速率。對於消費者來說,最大滯後需要小於一個閾值,最小獲取率需要大於0。

6.8 Zookeeper穩定版

目前的穩定分支是3.5。Kafka會定期更新,包括3.5系列的最新版本。

操作ZooKeeper

在操作上,為了健康的ZooKeeper安裝,我們做到以下幾點。

物理/硬體/網絡布局的冗餘:儘量不要把它們放在同一個機架上,體面的(但不要太瘋狂)硬體,儘量保持冗餘的電源和網絡路徑等。一個典型的ZooKeeper合集有5或7臺伺服器,分別可以容忍2臺和3臺伺服器宕機。如果你的部署規模較小,那麼使用3臺伺服器也是可以接受的,但要記住,這種情況下你只能容忍1臺伺服器宕機。

I/O隔離:如果你做了很多寫類型的流量,你幾乎肯定會希望事務日誌在一個專用磁碟組上。對事務日誌的寫入是同步的(但為了性能而分批進行),因此,並發寫入會明顯影響性能。ZooKeeper快照可以是這樣一個並發寫入的源頭,理想的情況是應該寫在與事務日誌分開的磁碟組上。快照是異步寫入磁碟的,所以一般情況下與作業系統和消息日誌文件共享是可以的。你可以通過dataLogDir參數配置伺服器使用一個單獨的磁碟組。

應用隔離。除非你真的了解你想安裝在同一個盒子上的其他應用程式的應用模式,否則隔離運行ZooKeeper是個不錯的主意(儘管這可能是一個與硬體能力平衡的行為)。

使用虛擬化時要小心。根據你的集群布局和讀/寫模式和SLA,它可以工作,但虛擬化層引入的微小開銷可能會增加,並拋開ZooKeeper,因為它可能對時間非常敏感。

ZooKeeper配置。它是java,確保你給它足夠的堆空間(我們通常用3-5G運行它們,但這主要是由於我們這裡的數據集大小)。不幸的是,我們沒有一個好的公式,但請記住,允許更多的ZooKeeper狀態意味著快照會變得很大,而大的快照會影響恢復時間。事實上,如果快照變得太大了(幾GB),那麼你可能需要增加initLimit參數,以給伺服器足夠的時間來恢復和加入合集。

監控。JMX和4個字母詞(4lw)命令都非常有用,它們在某些情況下會重疊(在這些情況下,我們更喜歡4個字母命令,它們看起來更可預測,或者至少,它們與LI監控基礎設施配合得更好)

不要過度構建集群:大型集群,特別是在重寫使用模式下,意味著大量的集群內通信(寫入和後續集群成員更新的配額),但不要過度構建(並有可能淹沒集群)。擁有更多的伺服器會增加你的讀取能力。

總的來說,我們儘量讓ZooKeeper系統保持小規模,因為它可以處理負載(加上標準的增長容量規劃),並且儘可能的簡單。與官方版本相比,我們儘量不做任何花哨的配置或應用程式布局,以及保持儘可能的自我控制。由於這些原因,我們傾向於跳過作業系統的打包版本,因為它有一種傾向,即試圖把東西放在作業系統標準的層次結構中,這可能是 "混亂的",想要一個更好的方式來形容它。

未完待續······

相關焦點

  • Kafka官方文檔中文版+Kafka面試題詳解!(可複製)
    今天分享的這份資料,包括121頁的Kafka官方中文文檔和Kafka常見面試題,由於內容過多,在此只以截圖展示部分內容,詳細完整版的文檔領取方式請見文末。這份Kafka官方中文文檔既適合小白入門也適合有一定基礎的同學進階提升
  • 騰訊TIM安卓版v2.1.0測試版:長按聊天圖片可識別文字
    今天騰訊官方發布了TIM安卓版v2.1.0測試版,本次更新主要包括長按聊天圖片識別圖中文字、雲文件接入WPS雲盤等。版本2.1.0中的更新內容:長按聊天時收發的圖片,可識別圖中文字,並支持收藏、搜索、翻譯雲文件接入WPS雲盤,可選擇雲盤內的文件進行瀏覽、下載及發送給好友TIM安卓版v2.1.0測試版下載地址:傳送門騰訊TIM是一款專注於團隊辦公協作的跨平臺溝通工具,提供雲文件、在線文檔、郵件、日程、收藏等好用的辦公功能,界面簡潔清晰,能夠實現QQ
  • Python3.9官方文檔翻譯版python簡介之數字
    Python3.9官方文檔翻譯版之篇首語Python3.9官方文檔翻譯版之摘要部分Python3.9官方文檔翻譯版之解釋器的使用1Python3.9官方文檔翻譯版之解釋器的使用2
  • 利用flume+kafka+storm+mysql構建大數據實時系統
    Kafka版本:0.8.0Kafka下載及文檔:http://kafka.apache.org/Kafka安裝:> tar xzf kafka-<VERSION>.://www.python.org/ftp/python/2.7.2/Python-2.7.2.tgz# tar zxvf Python-2.7.2.tgz# cd Python-2.7.2# .
  • Python3.9官方文檔翻譯版python簡介之字符串
    Python3.9官方文檔翻譯版python簡介之數字Python3.9官方文檔翻譯版之解釋器的使用1Python3.9官方文檔翻譯版之解釋器的使用2Python3.9官方文檔翻譯版之篇首語Python3.9官方文檔翻譯版之摘要部分
  • Python3.9官方文檔翻譯版python簡介之列表
    Python3.9官方文檔翻譯版python簡介之字符串Python3.9官方文檔翻譯版python簡介之數字Python3.9官方文檔翻譯版之篇首語Python3.9官方文檔翻譯版之摘要部分
  • 眾包翻譯文檔分享 |《 gRPC 官方文檔中文版》
    《gRPC 官方文檔中文版》日前在開源中國眾包平臺翻譯完成,現發布與各位 OSCer 共享:http://doc.oschina.net
  • 最新 | Python 官方中文文檔正式發布!
    Python 官方文檔終於發布中文版了!受英語困擾的小夥伴終於可以更輕鬆地閱讀官方文檔了。嘗鮮地址:https://docs.python.org/zh-cn/3/最新發布的這個 Python 官方文檔可以自由選擇語言,包括:英文、法文、日文、韓文和中文。
  • 10種快速翻譯文檔的方法
    方法三有道翻譯:http://fanyi.youdao.com注意:這裡我們採用的谷歌翻譯網址不是我們平常用的 cn 域名,而是國際版或者香港版的域名(可能有同學是打不開的)。在翻譯框下面會有一個上傳文檔的連結,同樣可以上傳文檔進行翻譯。對於國內版的谷歌翻譯(https://translate.google.cn),翻譯框下面是沒有這個上傳文檔的連結的。
  • 掘金翻譯計劃優質文檔
    一切為了翻譯,為了一切翻譯,為了翻譯一切。掘金翻譯計劃 是一個翻譯優質網際網路技術文章的社區,文章來源為 掘金 上的英文分享文章。內容覆蓋區塊鏈、人工智慧、Android、iOS、前端、後端、設計、產品和其他 等領域,以及各大型優質 官方文檔及手冊,讀者為熱愛新技術的新銳開發者。掘金翻譯計劃目前翻譯完成 1520 篇文章,官方文檔及手冊 13 個,共有 1000 餘名譯者貢獻翻譯和校對。
  • Logstash讀取Kafka數據寫入HDFS詳解
    user => "hadmin"        path => "/logs/nginx/%{index.date}/%{index.hour}.log"        codec => "json"    }    stdout { codec => rubydebug }}logstash配置文件分為三部分
  • Tensorflow 官方版教程中文版
    2015年11月9日,Google發布人工智慧系統TensorFlow並宣布開源,同日,極客學院組織在線TensorFlow中文文檔翻譯
  • Python官方中文文檔來了
    小夥伴們已經習慣了原汁原味的英文文檔,但如果有官方中文文檔,那麼查閱或理解速度都會大大提升。本文將介紹隱藏在 Python 官網的中文文檔~以前也是有一些第三方維護的 Python 中文文檔,不過可能因為人力等限制,並做不到同步更新與維護。目前也有很多高質量的 Python 中文資源,但大部分都是大牛寫的書或教程,官方文檔的翻譯並得不到保證。
  • Bootstrap 4 官方文檔中文版全國首發
    Bootstrap 官方於 2015 年 8 月 19 日發布了其最新版本 Bootstrap v4.0.0-alpha。Bootstrap 4 的開發是一項浩大的工程,將近一年的開發,幾乎是對整個項目的重寫,終於在 Bootstrap 4 周歲生日的時候推出了最新的 v4-alpha 版本。
  • GitHub官方中文文檔翻譯上線:Fork成了分叉
    GitHub官方中文文檔翻譯上線:Fork成了分叉 近期微軟旗下的GitHub發布了簡體中文版的技術幫助文檔,能夠讓開發者用戶更好地理解GitHub上的操作規範。
  • 搜狗翻譯App上線文檔翻譯功能
    據了解,搜狗翻譯App是第一個免費支持「文檔翻譯」的綜合類翻譯應用,目前可支持中英、中日、中韓三個語種間的文檔快速互譯。文獻材料、學術論文、技術文檔、小說著作等文檔均能得到快速精準翻譯,幫助年輕學生群體、白領人群放下PC電腦依然可以放眼世界,輕鬆學習或工作。  搜狗翻譯App「文檔翻譯」的最大亮點便是支持移動端文檔的實時翻譯與便捷儲存,解決了此前移動翻譯情境中的諸多痛點。
  • Android Studio提示與技巧(官方文檔翻譯)
  • honey select2最新版M5.0 漢化破解版下載
    honey select2最新版M5.0 漢化破解版下載 來源:財訊網 • 2020-08-18 10:35:55
  • 在線英文翻譯、文檔翻譯,這幾款翻譯工具你值得擁有
    >2)進入應用中心後,右上角搜索「翻譯」,找到相關插件,點擊安裝即可下圖也可看到,在精品推薦一欄,直接推薦的有翻譯插件,插件上備註說明是官方翻譯工具,也可點此直接安裝(彩雲小譯是很專業的翻譯平臺,對於學術文獻等的翻譯準確率高,同時,20M以下的文檔,每月有5次免費下載機會。
  • 新手福音,機器學習工具Sklearn 中文文檔 0.19版(最新)
    Sklearn具有以下特點:    簡單高效的數據挖掘和數據分析工具    讓每個人能夠在複雜環境中重複使用    建立NumPy、Scipy、MatPlotLib之上文檔獲取方式關注微信公眾號 datayx  然後回復 sk  即可獲取。