Kafka 基本原理(8000 字小結)

2020-12-15 酷扯兒

本文轉載自【微信公眾號:java進階架構師,ID:java_jiagoushi】經微信公眾號授權轉載,如需轉載與原文作者聯繫

上一周,師長發了篇:大白話+13張圖解 Kafka,大部分覺得講的很好,但是又有槓粉問了,咋地,還不夠清晰啊,有沒有更清晰的給看下。

先回顧下之前Kafka的系列文章,沒看過的可以先收藏:

RabbitMQ和Kafka到底怎麼選?17 個方面,綜合對比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四個分布式消息隊列大白話+13張圖解 Kafka

這次,再給大家整理了8K字乾貨!收好!

簡介Kafka架構Kafka存儲策略Kafka刪除策略Kafka brokerKafka DesignThe ProducerThe Consumer複製(Replication)日誌壓縮(Log Compaction)DistributionZookeeper協調控制開發環境搭建一些example參考Apache Kafka是分布式發布-訂閱消息系統。它最初由LinkedIn公司開發,之後成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分布式的,分區的和可複製的提交日誌服務。

它的架構包括以下組件:

話題(Topic):是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。生產者(Producer):是能夠發布消息到話題的任何對象。服務代理(Broker):已發布的消息保存在一組伺服器中,它們被稱為代理(Broker)或Kafka集群。消費者(Consumer):可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發布的消息。

1)kafka以topic來進行消息管理,每個topic包含多個partition,每個partition對應一個邏輯log,有多個segment組成。

2)每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。

3)每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。

4)發布者發到某個topic的消息會被均勻的分布到多個partition上(或根據用戶指定的路由規則進行分布),broker收到發布消息往對應partition的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁碟,只有flush到磁碟上的消息訂閱者才能訂閱到,segment達到一定的大小後將不會再往該segment寫數據,broker會創建新的segment。

插一下不會懷孕:

1)N天前的刪除。

2)保留最近的MGB數據。

與其它消息系統不同,Kafka broker是無狀態的。這意味著消費者必須維護已消費的狀態信息。這些信息由消費者自己維護,broker完全不管(有offset managerbroker管理)。

從代理刪除消息變得很棘手,因為代理並不知道消費者是否已經使用了該消息。Kafka創新性地解決了這個問題,它將一個簡單的基於時間的SLA應用於保留策略。當消息在代理中超過一定時間後,將會被自動刪除。這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費數據。這違反了隊列的常見約定,但被證明是許多消費者的基本特徵。以下摘抄自kafka官方文檔:

目標

1) 高吞吐量來支持高容量的事件流處理

2) 支持從離線系統加載數據

3) 低延遲的消息系統

持久化

1) 依賴文件系統,持久化到本地

2) 數據持久化到log

效率

1) 解決」small IO problem「:

使用」message set「組合消息。

server使用」chunks of messages「寫到log。

consumer一次獲取大的消息塊。

2)解決」byte copying「:

在producer、broker和consumer之間使用統一的binary message format。

使用系統的pagecache。

使用sendfile傳輸log,避免拷貝。

端到端的批量壓縮(End-to-end Batch Compression)

Kafka支持GZIP和Snappy壓縮協議。

負載均衡

1)producer可以自定義發送到哪個partition的路由規則。默認路由規則:hash(key)%numPartitions,如果key為null則隨機選擇一個partition。

2)自定義路由:如果key是一個user id,可以把同一個user的消息發送到同一個partition,這時consumer就可以從同一個partition讀取同一個user的消息。

異步批量發送

批量發送:配置不多於固定消息數目一起發送並且等待時間小於一個固定延遲的數據。

consumer控制消息的讀取。

Push vs Pull

1)producer push data to broker,consumer pull data from broker

2)consumer pull的優點:consumer自己控制消息的讀取速度和數量。

3)consumer pull的缺點:如果broker沒有數據,則可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有數據。

Consumer Position

1)大部分消息系統由broker記錄哪些消息被消費了,但Kafka不是。

2)Kafka由consumer控制消息的消費,consumer甚至可以回到一個old offset的位置再次消費消息。

Message Delivery Semantics

三種:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有個」acks「配置可以控制接收的leader的在什麼情況下就回應producer消息寫入成功。

Consumer:

讀取消息,寫log,處理消息。如果處理消息失敗,log已經寫入,則無法再次處理失敗的消息,對應」At most once「。讀取消息,處理消息,寫log。如果消息處理成功,寫log失敗,則消息會被處理兩次,對應」At least once「。讀取消息,同時處理消息並把result和log同時寫入。這樣保證result和log同時更新或同時失敗,對應」Exactly once「。Kafka默認保證at-least-once delivery,容許用戶實現at-most-once語義,exactly-once的實現取決於目的存儲系統,kafka提供了讀取offset,實現也沒有問題。

1)一個partition的複製個數(replication factor)包括這個partition的leader本身。

2)所有對partition的讀和寫都通過leader。

3)Followers通過pull獲取leader上log(message和offset)

4)如果一個follower掛掉、卡住或者同步太慢,leader會把這個follower從」in sync replicas「(ISR)列表中刪除。

5)當所有的」in sync replicas「的follower把一個消息寫入到自己的log中時,這個消息才被認為是」committed「的。

6)如果針對某個partition的所有複製節點都掛了,Kafka選擇最先復活的那個節點作為leader(這個節點不一定在ISR裡)。

1)針對一個topic的partition,壓縮使得Kafka至少知道每個key對應的最後一個值。

2)壓縮不會重排序消息。

3)消息的offset是不會變的。

4)消息的offset是順序的。

Consumer Offset Tracking

1)High-level consumer記錄每個partition所消費的maximum offset,並定期commit到offset manager(broker)。

2)Simple consumer需要手動管理offset。現在的Simple consumer Java API只支持commit offset到zookeeper。

Consumers and Consumer Groups

1)consumer註冊到zookeeper

2)屬於同一個group的consumer(group id一樣)平均分配partition,每個partition只會被一個consumer消費。

3)當broker或同一個group的其他consumer的狀態發生變化的時候,consumer rebalance就會發生。

1)管理broker與consumer的動態加入與離開。

2)觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一個consumer group內的多個consumer的訂閱負載平衡。

3)維護消費關係及每個partition的消費信息。

生產者代碼示例:

import java.util.*;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class TestProducer {public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, Stringproducer = new Producer<String, String(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = 「192.168.2.」 + rnd.nextInt(255); String msg = runtime + 「,www.example.com,」 + ip; KeyedMessage<String, Stringdata = new KeyedMessage<String, String("page_visits", ip, msg); producer.send(data); } producer.close(); }}

Partitioning Code:

import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;public class SimplePartitioner implements Partitioner {public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key, int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions; } return partition; }}

消費者代碼示例:

import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConsumerGroupExample {private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, IntegertopicCountMap = new HashMap<String, Integer(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]streams = consumerMap.get(topic); // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); }}

ConsumerTest 測試類:

import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); }}

相關焦點

  • kafka入門(原理-搭建-簡單使用)
    一、kafka介紹與原理我們將消息的發布(publish)稱作 producer,將消息的訂閱(subscribe)表述為 consumer,將中間的存儲陣列稱作 broker(代理),這樣就可以大致描繪出這樣一個場面:生產者將數據生產出來,交給 broker 進行存儲,消費者需要消費數據了
  • kafka消息分區機制原理
    背景kafka如何支撐海量消息的集中寫入?答案就是消息分區。核心思想是:負載均衡,採用合適的分區策略把消息寫到不同的broker上的分區中;其它的產品中有類似的思想。gt; message ;topic是邏輯上的消息容器;partition實際承載消息,分布在不同的kafka
  • 大牛嘔心力作——Kafka開發實戰,助你徜徉大數據時代
    而這時,kafka的出現算是解決了這個問題。Kafka的核心功能是什麼?一言以蔽之,高性能的消息發送與高性能的消息消費。接下來咱們就進入kafka世界,深入實戰探討kafka實戰開發。kafka實戰本書是涵蓋ApacheKafka各方面的具有實踐指導意義的工具書和參考書。
  • Kafka 原理簡介
    Kafka 原理簡介Kafka 是一種高吞吐的分布式發布訂閱的消息系統,可以處理消費者規模的網站中的動作流數據,具有高性能的,持久化,多副本,橫向擴展能力。kafka clusterBroker : Broker 是 kafka 的實例,每個伺服器有一個或者多個 Kafka實例。Kafka 集群內的 broker 有不重複的編號。
  • Kafka動態配置實現原理解析
    發展歷程動態配置文件發展歷程:kafka在0.8.0對topic的管理功能分布在三個shell中,它們分別是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh
  • kafka原理詳解(一)
    順序寫入:每個partition都是一個文件,kafka會把收到的message插入到文件末尾,每個consumer會對每個topic都有一個offset用來表示讀取到了第幾條數據。kafka會把所有的數據都保留下來,但是數據落到磁碟後,會隨著數據增加,而選擇要不要刪除,kafka目前提供兩種機制來刪除,一種是基於時間的,數據默認保留7天,一種是基於partition文件大小的。
  • 一文帶你了解 Kafka 基本原理
    Kafka存儲策略 1)kafka這違反了隊列的常見約定,但被證明是許多消費者的基本特徵。 以下摘抄自kafka官方文檔: Kafka Design 目標 1) 高吞吐量來支持高容量的事件流處理 2) 支持從離線系統加載數據 3) 低延遲的消息系統
  • 由Flink與Kafka實踐探究Kafka的兩個問題
     --topic wiki-result  --broker-list localhost:9092是生產者命令,然而此例中的生產者實際上是Fink監控程序,那麼原作者為何使用kafka-console-producer命令去創建topic而不是用kafka-topics命令呢?
  • �kafka分區副本及消息存儲原理
    分區副本及消息存儲原理通過這樣的副本機制來提高kafka的可用性。副本數據同步原理因此,我們可以開啟kafka的日誌壓縮功能,服務端會在後臺啟動啟動Cleaner線程池,定期將相同的key進行合併,只保留最新的value值。日誌的壓縮原理是:
  • 如何保證kafka消息不丟失
    背景這裡的kafka值得是broker,broker消費者丟失消息kafka消費消息的模型:關閉自動提交位移,消費者端配置參數:enable.auto.commit=false調優broker參數防止消息丟失主要通過調整配置來保證kafka消息不丟失。
  • Kafka設計原理詳解
    當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責讓新分區被其他節點感知到。消費者消費消息的offset記錄機制每個consumer會定期將自己消費分區的offset提交給kafka內部topic:__consumer_offsets,提交過去的時候,key是consumerGroupId+topic+分區號,value就是當前offset的值,kafka會定期清理topic裡的消息,最後就保留最新的那條數據
  • Kafka 架構及原理分析
    segment消費分組消費組數目小於等於 Topic 數目消費者可以消費多個分區消費編號連續消費切換消費者consumer_offset-[0~49] 保存消費者消費的偏移量數據多寫支持業務場景:數據同步存儲到 mysql、ES基於 binlog 實現主從複製canal偽裝為 slave 節點,進行數據同步,解析 binlog,可以對接 kafka
  • kafka連載(kafka的簡介)
    kafka是一種分布式的,基於發布/訂閱的消息管理系統。具備快速、可擴展、可持久化的特點。又因為具有強大的實時性的消息處理能力,所以很多公司都採用它來作為消息中間件。kafka提供了異步通信、緩衝或者峰值處理能力,以及基本的容災擴展、解耦合、數據持久化的能力。具體能力我不在這一一贅述。
  • 快速了解 Kafka 生產者的使用和原理
    作者 | 草捏子整理 | 楊碧玉出品 | 草捏子(ID:chaycao)頭圖 | CSDN 下載自視覺中國本文將學習 Kafka 生產者的使用和原理,文中使用的 kafka-clients 版本號為2.6.0。
  • kafka高吞吐量之消息壓縮
    背景保證kafka消息壓縮模型消息格式V1kafka小結壓縮的目的是較少空間佔用,帶來傳輸速度的提升,但是需要消耗一定的cpu ;是一種提高kafka消息吞吐量的有效辦法。本節回顧了新版的kafka是如何對消息進行壓縮的,壓縮和解壓縮的流程是怎樣的,然後對比了常見的4種壓縮算法,根據具體的使用場景來選擇是否啟用壓縮,以及選擇合適的壓縮算法。然後給出了壓縮的配置參數,在producer和borker端都可以使用compression.type來設置。
  • Kafka的生產者原理及重要參數說明
    Kafka的Producer原理首先我們得先有個集群吧,然後集群中有若干臺伺服器,每個伺服器我們管它叫Broker,其實就是一個個Kafka進程。生產者代碼設置參數部分// 創建配置文件對象Properties props = new Properties();// 這個參數目的是為了獲取kafka
  • 阿里愛問Kafka?阿里P9整理出Kafka寶典,太真實了
    kafka 如何不消費重複數據?Offeset 極限是多少?過了極限又是多少?如何實現 exactly once?不用 zk,怎麼管理集群元數據信息?Kafka Producer 如何優化打入速度?這裡推薦一份 Kafka 進階精品文檔——《深入理解kafka:核心設計與原理實踐》能讓你系統理解 Kafka
  • ApacheKafka社區中千金難求的一份最火卡夫卡實戰筆記
    該怎麼學習kafka呢?在學習kafka之前,最好能對kafka有一個簡單的了解,可以提出一些問題,帶著問題去學習,就會容易一些。kafka的結構使用kafka創建demokakfa客戶端請求是如何被處理的kafka中的組件coordinatorcontroller
  • kafka核心原理的秘密,藏在這16張圖裡
    Kafka 基本概念和架構問題簡單講下 Kafka 的架構?Kafka 是推模式還是拉模式,推拉的區別是什麼?Kafka 如何廣播消息?Kafka 的消息是否是有序的?kafka-configs.sh: 配置管理腳本kafka-console-consumer.sh: kafka 消費者控制臺kafka-console-producer.sh: kafka 生產者控制臺kafka-consumer-groups.sh: kafka 消費者組相關信息kafka-delete-records.sh
  • 華為架構師親手操刀,世界五百強都在用的kafka也就那麼回事
    但是市場上系統學習kafka的資料真的太少了,今天分享的這份kafka資料由華為架構師親手操刀,內容涵蓋源碼到實戰,堪稱一絕,後面附自己整理的kafka由基礎到架構的面試題Kafka源碼解析與實戰kafka