Kafka入門教程 (二) : Consumer使用(topic,partition和offset)

2021-12-27 開發者小棧

Consumer線程不安全,不能多線程共用

topic與partition

上文提到:一個Topic的一個Partition,只能被同一個ConsumerGroup的一個消費者消費,這裡主要介紹Consumer啟動時指定topic和partition的使用。

只指定topic

即每個consumer只指定需要消費的topic,高級消費,對應kafka-clients中的Consumer.subscribe()方法

假定,1-N個consumer,屬於同一個group。根據consumer的個數,由kafka-clinets分配每個consumer消費的partition,分配策略見後文。注意:必須使用合理的分配策略,否則可能出現一些consumer沒有分配partition的情況。

若N>partition num(所有topic的partition總和), 則一些consumer不會被分配partition

若N<partition num,則某些consumer會消費多個partition

當消費多個partition時,消費每個分區內的消息是有序的,但消費多個分區之間的消息是無序的(可以在消費記錄中獲得當前記錄的partition)

指定topic和partition

即為每個consumer指定需要消費的topic和partition,也即所說的低級消費,對應kafka-clients中的Consumer.assign()方法。

這種情況下,不再由kafka-clinets分配,指定哪個partition消費哪個,所以,當同一個消費組指定重複的partition時,會消費到重複的數據(完全重複的數據,因為poll的offset是本地維護的),但是server端只有一個offset!

所以,這種模式下,需要開發者自己保證同一個消費組的消費著具有不重複的partition。

那麼,為什麼要使用低級消費者呢?

高級消費partition的分配是由kafka-clinets完成的,但是會查詢server端的信息,所以集群環境下,當沒有指定partition時,每加入/離開一個消費者,kafka-clients都會重新平衡(reblance)partition的分配,這個時候,如果有消費完成但是沒有提交的offset,reblance時則會造成數據的重複消費或者數據丟失(具體是哪種情況,要看offset的提交策略)。低級消費則不會發生reblance!

注意 :Spring-kafka多線程消費的配置下,指定topic和partition時,也是低級消費,其線程和partition的分配策略見後續spring-kafka的教程。

partition分配策略

range: 得到topic-partitions關係,得到topic-consumers關係,然後,按照topic進行分配,即topic的所有partition按順序分配到其所有的consumer上,舉例:topicA-3partition, topicB-1partition, 4 consumers, 過程是,A的3個partition分配到consumer1-3,B的1個partition分配到consumer1,consumer4空閒,所以使用的最大線程數=max(topic*partition)

roundrobin:topics和patition組合,上述例子,就是ta-0,ta-1,ta-2,tb-0,然後四個取hashcode得到順序,然後挨個分配到consumer上(要求:每一個consumer消費的topics有相同的streams&&這個消費組中每個consumer消費的topics必須完全相同)

上面的文字可能有描述不準確或不清楚的地方,這裡列出了官方對著兩種策略的解釋:

RoundRobin:

The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.)

(For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if:

a) Every topic has the same number of streams within a consumer instance

b) The set of subscribed topics is identical for every consumer instance within the group.

Range:

Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1

Reblance時發生了什麼?

查看kafka-clients源碼可以發現:

AbstractCoordinator 有詳細說明調用subscribe方法發生了以下的事請

consumer註冊到到服務端

coordinator(server端維護的一個服務)查找所有的該組consumer,選取leander

如果auto commit為true,所有的consumer提交本地offset到服務端;為false則不提交

leader通過coordinator獲取服務端所有的partition和offset,並使用策略重新分配partition,結果返回給coordinator,coordinator下發分配結果到所有consumer(即jon和leave的reblance)

調用assign則不會使用AbstractCoordinator ,而是直接分配指定的partition。這兩個是高級消費和低級消費的分界點

所以高級消費者集群時,新加入的consumer,如果是auto-commit則會提交offset,若為處理完可能會丟失數據;否則不提交,會重複消費數據。離開consumer,若未提交offset離開,則會重複消費數據;若自動提交了但是未消費,則會丟失數據

offset

如上文所述,kafka高吞吐量的保證是Partition是順序寫磁碟,同樣消費也是順序的,offset維護了一個group的消費者在當前partition消費的數據位置。

當consumer啟動後,會維護一個本地的offset,運行中poll數據使用的是本地offset,不再查詢server;server端也會維護一個offset,新版kafka offset是維護在一個topic中,使用默認的producer更新offset;舊版維護在zookeeper。

提交offset是指:使用本地的offset去更新server端的offset。

一個Consumer的運行過程是:查詢服務端offset–>存在,從offset開始讀取數據;不存在,根據則根據autoreset的策略執行(earliest最開始接收,latest只接收新數據);或者啟動時指定offset–>poll數據,更新本地offset–>自動或手動提交offset,如果提交時未指定offset,則使用本地維護的offset更新服務端,指定了offset,則使用指定的offset更新服務端(!但是poll數據依舊使用的本地的offset,server端的offset僅在Consumer啟動時會使用)

自動提交(不推薦)

自動提交策略下,是每隔指定時間,由kafka-clients自動提交本地維護的offset,默認本地offset=poll的數量+1。(本地offset可以通過seek方法修改)

但是會出現數據丟失的情況,比如poll了一批數據沒有處理完,但是到時間了已經提交了offset,然後程序終止了,下次啟動會從新的offset』啟動,沒有處理的數據丟失了

手動提交

不指定offset:同上,也是提交本地維護的offset,默認本地offset=poll的數量+1。

這種模式下,數據處理完畢(保存/丟棄)後再手動提交,解決了自動模式下的數據丟失問題,但是可能存在消費完的數據,offset沒有提交成功,重複消費數據的問題(可以通過資料庫事務解決)

指定offset:更新server端offset為指定值,但是本地offset不會更新,所以在consumer沒有重啟的情況下,是不會消費到重複數據。

【參考連結】

Kafka-偏移量提交(https://blog.csdn.net/u011669700/article/details/80053313)

更詳細的reblance過程(http://matt33.com/2017/10/22/consumer-join-group/)

相關焦點

  • kafka使用原理介紹
    如果這個consumer group裡面consumer的數量小於topic裡面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,我們不用指定),但是總之這個topic裡面的所有partition都會被處理到的。。
  • Apache Kafka 快速入門指南
    無論是 kafka 集群,還是 consumer 都依賴於 「Zookeeper」 集群保存一些 meta 信息, 來保證系統可用性。二、為什麼要有 Kafka?使用特定的機器硬體,一個 Broker 每秒可以處理成千上萬的分區和百萬量級的消息。
  • kafka入門(原理-搭建-簡單使用)
    乍一看返也太簡單了,不是說了它是分布式嗎,難道把 producer、 broker 和 consumer 放在三臺不同的機器上就算是分布式了嗎。看 kafka 官方給出的圖:多個 broker 協同合作,producer 和 consumer 部署在各個業務邏輯中被頻繁的調用,三者通過 zookeeper管理協調請求和轉發。
  • Kafka 基本原理(8000 字小結)
    1)kafka以topic來進行消息管理,每個topic包含多個partition,每個partition對應一個邏輯log,有多個segment組成。2)每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
  • kafka異步雙活方案 mirror maker2深度解析
    mirror maker2背景通常情況下,我們都是使用一套kafka集群處理業務。但有些情況需要使用另一套kafka集群來進行數據同步和備份。在kafka早先版本的時候,kafka針對這種場景就有推出一個叫mirror maker的工具(mirror maker1,以下mm1即代表mirror maker1),用來同步兩個kafka集群的數據。
  • Kafka簡明教程
    二代Kafka引入了Partition的概念,也就是採用多條隊列, 每條隊列裡面的消息都是相同的topic:不過這是屬於比較高級的應用了,以後有機會再和大家討論。Kafka二代足夠完美了嗎?當然不是,我們雖然通過Partition提升了性能,但是我們忽略了一個很重要的問題——高可用。萬一機器掛掉了怎麼辦?單點系統總是不可靠的。我們必須考慮備用節點和數據備份的問題。
  • Kafka這些名詞都說不出所以然,您竟然敢說自己精通kafka
    kafka與內存微服務一起使用,提供可靠性,可用於向 CEP(複雜事件流系統)和IoT / IFTTT式自動化系統提供事件。Kafka可以與Flume ,Spark Streaming,Storm,HBase,Flink和Spark一起工作,以實時接收,分析和處理流數據。
  • Kafka快速入門秘籍:背景介紹,應用場景分析、核心架構分析
    在有限資源下,使用消息中間件能夠使系統性能從容倍增!二、kafka簡介Kafka作為一種消息中間件,是一種分布式的,基於發布/訂閱的消息系統。主要設計目標如下:以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能高吞吐率。
  • Kafka官方文檔翻譯-最新版v2.7(三)
    所以創建的第一個文件將是000000000.kafka,每一個額外的文件將有一個整數名,大約是前一個文件的S字節,其中S是配置中給出的最大日誌文件大小。 記錄的確切二進位格式是版本化的,並作為標準接口進行維護,因此在理想的情況下,記錄批次可以在生產者、broker和客戶端之間傳輸,而無需重新複製或轉換。上一節包括了關於記錄的磁碟格式的細節。
  • Kafka設計思想之生產者和消費者
    為了讓生產者實現這個功能,所有的 kafka 伺服器節點都能響應這樣的元數據請求: 哪些伺服器是活著的,主題的哪些分區是主分區,分配在哪個伺服器上,這樣生產者就能適當地直接發送它的請求到伺服器上。客戶端控制消息發送數據到哪個分區,這個可以實現隨機的負載均衡方式,或者使用一些特定語義的分區函數。
  • Kafka分區與消費者的關係
    當然每個主題也可以自己設置分區數量,如果創建主題的時候沒有指定分區數量,則會使用server.properties中的設置。bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1在創建主題的時候,可以使用--partitions選項指定主題的分區數量[root@localhost
  • Kafka文件存儲機制
    一、文件結構06.Kafka文件存儲機制01.png說明Kafka 當中消息是以 topic 進行分類的,生產者通過 topic 向 Kafka broker 發送消息,消費者通過 topic 讀取數據;topic 在物理層面又能以 partition 為分組, 一個 topic 可以分成若干個 partition
  • 從未如此簡單:10分鐘帶你逆襲Kafka!
    我們這裡使用 Kafka 的 Zookeeper,只啟動一個節點,但是正真的生產過程中,是需要 Zookeeper 集群,自己搭建就好,後期我們也會出 Zookeeper 的教程,大家請關注就好了。out.println("topic = " + record.topic());             System.out.println("partition = " + record.partition());             System.
  • 圖文詳解:Kafka到底有哪些秘密讓我對它情有獨鍾呢?
    :消息類別,Kafka 按照 topic 來分類消息partition:topic 的分區,一個 topic 可以包含多個 partition,topic>partition 的數據文件partition 中的每條 Message 包含三個屬性:offset,MessageSize,data,其中 offset 表 示 Message 在這個 partition 中的偏移量,offset 不是該 Message 在 partition 數據文件中的實際存儲位置,而是邏輯上一個值,它唯一確定了
  • Kafka常見錯誤整理(不斷更新中)
    1、UnknownTopicOrPartitionExceptionorg.apache.kafka.common.errors.UnknownTopicOrPartitionException:Thisserverdoesnothostthistopic-partition報錯內容
  • Kafka【入門】就這一篇!
    比如,我創建了一個 Topic 名字為 test ,沒有指定 Partition 的數量,那麼會默認創建一個 test-0 的文件夾,這裡的命名規則是:<topic_name>-<partition_id>。
  • 利用flume+kafka+storm+mysql構建大數據實時系統
    KafkaKafka是一個消息中間件,它的特點是:1、關注大吞吐量,而不是別的特性2、針對實時性場景3、關於消息被處理的狀態是在consumer端維護,而不是由kafka server端維護。4、分布式,producer、broker和consumer都分布於多臺機器上。
  • 如何理解Kafka的消息可靠性策略?
    這些問題都很正常,在開始接觸和使用時總會有這樣或那樣的問題。 一般情況下,不做了解,使用各種默認的推薦值,也是可以work的。 但是我們要優雅的提升自己的姿(知)勢(識)。 學習其背後的原理,至少在遇到一般的問題時,能夠分析和處理問題,做到心中有數。
  • 大白話+13張圖解 Kafka
    1.Topic 主題kafka學習了資料庫裡面的設計,在裡面設計了topic(主題),這個東西類似於關係型資料庫的表此時我需要獲取中國移動的數據,那就直接監聽TopicA即可2.Partition 分區
  • Kafka原理與技術
    Producer 使用 push模式將消息發布到 Broker;Consumer 使用 pull 模式從 Broker 訂閱並消費消息。  Kafka專用術語:Broker:消息中間件處理結點,一個Kafka節點就是一個Broker,多個Broker可以組成一個Kafka集群。Topic:一類消息,Kafka集群能夠同時負責多個topic的分發。