Consumer線程不安全,不能多線程共用
topic與partition上文提到:一個Topic的一個Partition,只能被同一個ConsumerGroup的一個消費者消費,這裡主要介紹Consumer啟動時指定topic和partition的使用。
只指定topic即每個consumer只指定需要消費的topic,高級消費,對應kafka-clients中的Consumer.subscribe()方法
假定,1-N個consumer,屬於同一個group。根據consumer的個數,由kafka-clinets分配每個consumer消費的partition,分配策略見後文。注意:必須使用合理的分配策略,否則可能出現一些consumer沒有分配partition的情況。
若N>partition num(所有topic的partition總和), 則一些consumer不會被分配partition
若N<partition num,則某些consumer會消費多個partition
當消費多個partition時,消費每個分區內的消息是有序的,但消費多個分區之間的消息是無序的(可以在消費記錄中獲得當前記錄的partition)
指定topic和partition即為每個consumer指定需要消費的topic和partition,也即所說的低級消費,對應kafka-clients中的Consumer.assign()方法。
這種情況下,不再由kafka-clinets分配,指定哪個partition消費哪個,所以,當同一個消費組指定重複的partition時,會消費到重複的數據(完全重複的數據,因為poll的offset是本地維護的),但是server端只有一個offset!
所以,這種模式下,需要開發者自己保證同一個消費組的消費著具有不重複的partition。
那麼,為什麼要使用低級消費者呢?高級消費partition的分配是由kafka-clinets完成的,但是會查詢server端的信息,所以集群環境下,當沒有指定partition時,每加入/離開一個消費者,kafka-clients都會重新平衡(reblance)partition的分配,這個時候,如果有消費完成但是沒有提交的offset,reblance時則會造成數據的重複消費或者數據丟失(具體是哪種情況,要看offset的提交策略)。低級消費則不會發生reblance!
注意 :Spring-kafka多線程消費的配置下,指定topic和partition時,也是低級消費,其線程和partition的分配策略見後續spring-kafka的教程。
partition分配策略range: 得到topic-partitions關係,得到topic-consumers關係,然後,按照topic進行分配,即topic的所有partition按順序分配到其所有的consumer上,舉例:topicA-3partition, topicB-1partition, 4 consumers, 過程是,A的3個partition分配到consumer1-3,B的1個partition分配到consumer1,consumer4空閒,所以使用的最大線程數=max(topic*partition)
roundrobin:topics和patition組合,上述例子,就是ta-0,ta-1,ta-2,tb-0,然後四個取hashcode得到順序,然後挨個分配到consumer上(要求:每一個consumer消費的topics有相同的streams&&這個消費組中每個consumer消費的topics必須完全相同)
上面的文字可能有描述不準確或不清楚的地方,這裡列出了官方對著兩種策略的解釋:
RoundRobin:
The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.)
(For simplicity of implementation) the assignor is allowed to assign a given topic-partition to any consumer instance and thread-id within that instance. Therefore, round-robin assignment is allowed only if:
a) Every topic has the same number of streams within a consumer instance
b) The set of subscribed topics is identical for every consumer instance within the group.
Range:
Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition. For example, suppose there are two consumers C1 and C2 with two streams each, and there are five available partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
Reblance時發生了什麼?查看kafka-clients源碼可以發現:
AbstractCoordinator 有詳細說明調用subscribe方法發生了以下的事請
consumer註冊到到服務端
coordinator(server端維護的一個服務)查找所有的該組consumer,選取leander
如果auto commit為true,所有的consumer提交本地offset到服務端;為false則不提交
leader通過coordinator獲取服務端所有的partition和offset,並使用策略重新分配partition,結果返回給coordinator,coordinator下發分配結果到所有consumer(即jon和leave的reblance)
調用assign則不會使用AbstractCoordinator ,而是直接分配指定的partition。這兩個是高級消費和低級消費的分界點
所以高級消費者集群時,新加入的consumer,如果是auto-commit則會提交offset,若為處理完可能會丟失數據;否則不提交,會重複消費數據。離開consumer,若未提交offset離開,則會重複消費數據;若自動提交了但是未消費,則會丟失數據
offset如上文所述,kafka高吞吐量的保證是Partition是順序寫磁碟,同樣消費也是順序的,offset維護了一個group的消費者在當前partition消費的數據位置。
當consumer啟動後,會維護一個本地的offset,運行中poll數據使用的是本地offset,不再查詢server;server端也會維護一個offset,新版kafka offset是維護在一個topic中,使用默認的producer更新offset;舊版維護在zookeeper。
提交offset是指:使用本地的offset去更新server端的offset。
一個Consumer的運行過程是:查詢服務端offset–>存在,從offset開始讀取數據;不存在,根據則根據autoreset的策略執行(earliest最開始接收,latest只接收新數據);或者啟動時指定offset–>poll數據,更新本地offset–>自動或手動提交offset,如果提交時未指定offset,則使用本地維護的offset更新服務端,指定了offset,則使用指定的offset更新服務端(!但是poll數據依舊使用的本地的offset,server端的offset僅在Consumer啟動時會使用)
自動提交(不推薦)自動提交策略下,是每隔指定時間,由kafka-clients自動提交本地維護的offset,默認本地offset=poll的數量+1。(本地offset可以通過seek方法修改)
但是會出現數據丟失的情況,比如poll了一批數據沒有處理完,但是到時間了已經提交了offset,然後程序終止了,下次啟動會從新的offset』啟動,沒有處理的數據丟失了
手動提交不指定offset:同上,也是提交本地維護的offset,默認本地offset=poll的數量+1。
這種模式下,數據處理完畢(保存/丟棄)後再手動提交,解決了自動模式下的數據丟失問題,但是可能存在消費完的數據,offset沒有提交成功,重複消費數據的問題(可以通過資料庫事務解決)
指定offset:更新server端offset為指定值,但是本地offset不會更新,所以在consumer沒有重啟的情況下,是不會消費到重複數據。
【參考連結】
Kafka-偏移量提交(https://blog.csdn.net/u011669700/article/details/80053313)
更詳細的reblance過程(http://matt33.com/2017/10/22/consumer-join-group/)