生產環境使用Apache Kafka和Redis的流架構

2020-10-13 閃念基因

原文地址:https://www.jdon.com/55160

原文作者:banq

這篇文章描述了基於Apache Kafka和Redis的體系結構如何應用於構建高性能,彈性流系統。它適用於近實時系統,在該系統中,需要處理大量事件流,並將結果提交給大量的訂戶,每個訂戶都接收自己的流視圖。

示例可能包括以下內容:

  • 流化莊家賠率-不同用戶瀏覽網站的不同部分,其投注單可增加不同的市場
  • 實時遊戲-根據玩家的輸入和遊戲規則,為每個玩家計算一個不同的世界視圖
  • 基於訂閱的數據分發,每個使用者接收總數據集的一個分區

該體系結構假設數據量很大,可能需要計算量大的步驟才能計算出各個視圖。該體系結構假定reducer(負責計算的組件)可以獨立擴展並通過重新啟動從故障中恢復。它們的無狀態性質和動態擴展使它們非常適合在Kubernetes集群中進行部署。

上圖使用流系統的術語將博彩賠率分發給所有連接的Web和移動客戶端。

總體而言,該系統由幾個部分組成,這些部分協同工作並獨立擴展:

  • 流控制API:可以實現為正常的REST服務,負載均衡。
  • 流發布者:它接受WebSockets連接,負載均衡,單個連接可以在任何計算機上運行。
  • Redis PUB-SUB組件是通道所在。最終,可以將其分片或替換為RabbitMQ集群。發布者和Redis PUB-SUB可以替換為socket.io。
  • 一個Kafka隊列,其中包含兩個主題:一個用於流命令(分配給所有的reducer),另一個是分區的主題,所有的reducer都從中使用各自的分區而不重疊(我們稱其為data topic)。本主題接收的數據量最大。為了實現良好的負載平衡,建議分區數量很多。
  • Reducer本身會消耗數據主題中的非重疊分區。
  • 狀態存儲地方可以是HA Redis集群、MongoDB或任何非常快速的鍵值狀態。

Apache Kafka

Apache Kafka允許以容錯,分布式的方式將輸入的數據流和計算結果存儲在管道的後續階段。Apache Kafka還允許在高數據負載的情況下將新伺服器連接到系統。

  • 如果進行複製,則每個分區將有一個伺服器作為領導者,而可將其他伺服器配置為跟隨者。領導者管理特定分區的讀/寫請求,而跟隨者管理領導者的複製。
  • 在Kafka中,領導層是按分區定義的。這意味著伺服器可以是分區的領導者,而另一個可以是跟隨者。
  • Zookeeper按主題存儲消費者補償。

使用Kafka開始進行有趣的項目的一種簡單方法是使用docker-compose,其設置類似於以下內容:

version : '3.5'services: zookeeper: image: <font>"confluentinc/cp-zookeeper"</font><font> environment: - ZOOKEEPER_CLIENT_PORT=2181 kafka: image: </font><font>"confluentinc/cp-kafka"</font><font> environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT:</font><font><i>//kafka:9092</i></font><font> - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 kafka_<b>rest</b>: image: </font><font>"confluentinc/cp-kafka-rest"</font><font> ports: - </font><font>"8082:8082"</font><font> environment: - KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_REST_LISTENERS=http:</font><font><i>//0.0.0.0:8082</i></font><font> - KAFKA_REST_HOST_NAME=kafka_<b>rest</b>networks: <b>default</b>: name: oda</font>

請注意,以上dockerfile中的競爭條件(Zookeeper啟動速度比Kafka慢)導致Kafka服務無法啟動,必須使用docker-compose scale kafka=1重新啟動。服務啟動後,我們可以如下測試配置:

docker run --net=oda --rm confluentinc/cp-kafka bash -c <font>"seq 42 | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic foo && echo 'Produced 42 messages.'"</font><font>docker run --net=oda --rm confluentinc/cp-kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic foo --from-beginning --max-messages 42</font>

下面是一個短節點腳本,它通過提供默認參數來與上面介紹的dockerfile一起使用,簡化了使用kafka控制臺工具的操作:

#!/usr/local/bin/nodelet child_process = require('child_process');let cmds = { 'topics' : ['kafka-topics', '--zookeeper', 'zookeeper:2181'], 'produce' : ['kafka-console-producer', '--broker-list', 'kafka:9092'], 'consume' : ['kafka-console-consumer', '--bootstrap-server', 'kafka:9092']}let params = <b>null</b>;process.argv.forEach((arg) => { <font><i>// first with command</i></font><font> <b>if</b> (params == <b>null</b> && cmds[arg] != <b>null</b>) { let cmd = cmds[arg]; params = ['run', '--net=oda', '--rm', '-it', 'confluentinc/cp-kafka', ...cmd] } </font><font><i>// add the rest</i></font><font> <b>if</b> (params != <b>null</b>){ params.push(arg); }});let docker = child_process.spawn('docker', params, {stdio: 'inherit'});docker.on('error', (err) => { console.log('Failed to start docker');});docker.on('close', (code) => { console.log(`Child process exited with code ${code}`);});</font>

這樣可以輕鬆進行如下測試:

  • 創建一個名為的主題test:./kafka.js topics --create --topic test --replication-factor 3 --partitions 3
  • 列出主題: ./kafka.js topics --list
  • 產生一些消息: ./kafka.js produce --topic test
  • 要從頭開始閱讀消息: ./kafka.js consume --topic test --from-beginning

Zookeeper可以並且也應該縮放。由於需要仲裁,用於確定Zookeeper實例數來運行式是2 * F + 1其中F是所期望的容錯因子。

為了測試確實有數據正在通過系統傳輸,讓我們為Kafka Producer編寫代碼。我將使用一個非常基本的NodeJS程序包,該程序包可連接到Kafka的REST API。對於生產用途,應首選更高級的軟體包,例如kafka-node撰寫本文時快速增長的軟體包kafkajs。我在這裡使用該kafka-rest軟體包是為了簡化和方便。

<font><i>// kafka rest is exposed from docker-compose on 8082</i></font><font><b>const</b> kafka = <b>new</b> KafkaRest({ 'url': 'http:</font><font><i>//localhost:8082' });</i></font><font></font><font><i>// make sure the topic is created before</i></font><font><b>const</b> target = kafka.topic('random_walk');<b>const</b> randWalker = function(){ function clamp(v, min, max){ <b>if</b> (v < min) <b>return</b> min; <b>if</b> (v > max) <b>return</b> max; <b>return</b> v; } let dir = 0; let prevStep = 0; <b>const</b> rnd = require('random'); <b>return</b> { randomWalk(){ dir = clamp(rnd.normal(dir, 1)(), -0.75, 0.75); prevStep += dir; <b>return</b> prevStep; } }}();setInterval(()=> { </font><font><i>// very basic producer, no key, no partitions, just straight round robin</i></font><font> target.produce( randWalker.randomWalk().toFixed(4) );}, 1000);</font>

一個簡單的消費使用者可以實現如下:

<b>const</b> KafkaRest = require('kafka-<b>rest</b>');<b>const</b> kafka = <b>new</b> KafkaRest({ 'url': 'http:<font><i>//localhost:8082' });</i></font><font> </font><font><i>// start reading from the beginning</i></font><font>let consumerConfig = { 'auto.offset.reset' : 'smallest'};</font><font><i>// join a consumer group on the matches topic</i></font><font>kafka.consumer(</font><font>"consumer-group"</font><font>).join(consumerConfig, function(err, instance) { <b>if</b> (err) <b>return</b> console.log(</font><font>"Failed to create instance in consumer group: "</font><font> + err); console.log(</font><font>"Consumer instance initialized: "</font><font> + instance.toString()); <b>const</b> stream = instance.subscribe(</font><font>"random_walk"</font><font>); stream.on('data', function(msgs) { <b>for</b>(<b>var</b> i = 0; i < msgs.length; i++) { let key = msgs[i].key.toString('utf8'); let value = msgs[i].value.toString('utf8'); console.log(`${key} : ${value}`); } }); });</font>

消費組/使用組提供了構建提議的體系結構的核心抽象。使用方組允許一組並發進程使用來自Kafka主題的消息,同時,確保不會為兩個使用方分配相同的分區列表。由於流量波動以及由於使用者崩潰導致的重啟,因此可以無縫地擴大和縮小使用者組。使用者組中的使用者會收到一個rebalance通知回調,其中包含分配給他們的分區列表,並且他們可以從分區的開頭,該組的另一個成員最後提交的偏移量或自管理的分區中恢復使用抵消。

上面的代碼提供了一種非常簡單的方法來檢查使用者組的工作方式。只需啟動幾個競爭的消費者並殺死其中一個,或稍後再重新啟動即可。由於重新啟動策略設置為主題的開頭,因此'auto.offset.reset' : 'smallest'每次使用都會從每個分區的開頭開始。

Redis注意點

僅通過使用Redis管道儘可能多地批量更新Redis,架構才能實現其最高吞吐量。

對於其他方案,僅在Redis更新後才可以提交Kafka偏移,以犧牲吞吐量來確保正確性。

主要命令和數據流

在本節中,我們將採用賠率更新方案(sportsbook),在這種情況下,更新需要儘快推送到偵聽的前端。

一個簡單的場景:用戶想要訂閱單個市場的更改

  1. 訂閱伺服器向REST控制API發出「訂閱」命令,並且向其發出唯一的通道ID(例如GUID)
  2. 通過扇出機制(命令主題)進一步向所有減速器reducer發出subscription命令。
  3. 訂閱者打開到Stream Publisher的Websocket,並請求將其連接映射到頻道ID。流發布者使用該特定的連接ID訂閱Redis PUB-SUB通道。到目前為止,還沒有數據發布到Redis PUB-SUB。
  4. 一旦訂閱者收到確認已建立連接的ACK,它將向命令API發出另一個命令「begin stream」。這樣做是為了指示簡化程序計算初始狀態,並在訂戶打開連接後通過pub-sub發送它,因此不會丟失任何更新。
  5. 精簡器維護兩個映射:通道id和連接到市場的通道ID,因此對於每個連接進來的市場,它都直接導向其訂閱通道,並且斷開連接得到了適當的管理,而不會留下內存洩漏。

Reducers可能會在內存中維護更新的市場值的副本,以進行快速訪問和進程內緩存,但是對於每個訂閱的市場,其值也必須保存在進程外的Redis HA集群或分片的MongoDB中。訂閱時,如果reducer尚未存儲市場,即尚未有其他訂閱者訂閱,則必須首先在共享的Redis或Mongo中查找該市場。在來自Kafka主題的新進入市場更新中,必須首先更新Redis / Mongo。如果無法跳過更新,則僅在Redis / Mongo寫入成功後才提交偏移量。訂閱也保存在Redis / Mongo中,以使Reducer重新啟動,放大或縮小。

為了處理reducer的重新啟動,reducer知道了分區邏輯,因此在讀取訂閱信息時,它將知道它將服務的市場ID類別和忽略的市場ID。如果在將消息發布給訂閱客戶端之後發送對Kafka的ACK,則reducer可能會選擇延遲初始化,而不會從Redis為其分配的市場中讀取當前狀態。如果訂閱率很高,那麼reducer可能會選擇在Re​dis的另一個集合中存儲一份受歡迎的市場列表,以期渴望初始化時閱讀,但在我看來,這不僅僅是一種優化,而不僅僅是必須實現的功能。從頭開始。

變化

  • 訂戶能夠訂閱未知的市場-例如,所有即將開始的足球比賽。在這種方法中,reducer分為兩步:第一步,計算與查詢匹配的市場ID,並代表客戶充當虛擬訂戶;第二步,上述步驟,將市場發送給訂閱客戶。
  • 複雜查詢的視圖(例如即將到來的比賽頁面)通過REST發布,然後通過CDN發布,以進行快速的初始加載以及時間戳。因此,訂戶a)知道要訂閱哪些市場,因為它們已經在頁面中,並且b)可以使用時間戳開始只接收比其已有的更新新的更新。這種方法大大減少了首次加載的時間。
  • reducer不是發布市場,而是在用戶操作和比賽事件的驅動下轉變為遊戲狀態。在這種情況下,命令隊列成為分配給所有reducer的匹配事件的隊列,賠率隊列成為用戶操作的接收者,但常規模式保持不變。

替代方法

可以用於原型製作和相對穩定的MVP的另一種方法是使用諸如MongoDB流(或Firebase或RethinkDB)之類的東西來監聽對集合的更改,在該集合中狀態被存儲和修改。每個文檔都是變更的基本單位。客戶可以訂閱多個文檔。所有更改都會傳播到所有發布者,發布者會確定哪些是正確的收件人。

這種對體系結構的支持使模型更加簡單,使開發人員無需管理複雜的問題(例如持久性,同步和還原),同時減少了依賴項的數量並簡化了操作。


原文地址:https://www.jdon.com/55160

原文作者:banq

相關焦點

  • Docker + Nodejs + Kafka + Redis + MySQL搭建簡單秒殺環境
    安裝redis,kafka, zookeeper和mysql為了方便搭建環境,這幾個組件會以docker container的形式啟動。在此之前需要去docker官網下載並安裝docker engine,docker machine和docker compose。
  • 使用Kafka Streams創建流數據管道
    創建流式拓撲可以使數據處理器成為小型,專注的微服務,可以輕鬆地對其進行分配和擴展,並可以並行執行其工作。為什麼要使用kafka Streams?Kafka Streams是由Confluent開發的API,用於構建使用Kafka主題,分析,轉換或豐富輸入數據然後將結果發送到另一個Kafka主題的流應用程式。 它使您可以使用簡潔的代碼以分布式且容錯的方式執行此操作。 Kafka Streams將處理器拓撲定義為流處理代碼的邏輯抽象。
  • Canal+Kafka實現MySQL與Redis數據同步
    架構圖canal是一個偽裝成slave訂閱mysql的binlog,實現數據同步的中間件。上一篇文章《canal入門》我已經介紹了最簡單的使用方法,也就是tcp模式。而canal的RabbitMQ模式目前是有一定的bug,所以一般使用Kafka或者RocketMQ。
  • 從零開始搭建Kafka+SpringBoot分布式消息系統
    /usr/local/kafka/wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz下載成功之後解壓:-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients
  • springboot + kafka的使用
    kafka broker發消息的客戶端;2)Consumer :消息消費者,向kafka broker取消息的客戶端;3)Topic :可以理解為一個隊列;4) Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。
  • 「事件驅動架構」GoldenGate創建從Oracle到Kafka的CDC事件流(2)
    「事件驅動架構」GoldenGate創建從Oracle到Kafka的CDC事件流(1)步驟7/12:安裝並運行Apache Kafka從VM的桌面環境中打開Firefox並下載Apache Kafka(我使用的是kafka_2.11-2.1.1.tgz)。
  • 在邊緣處部署Kafka的用例與架構
    實現對實時消息/事件流的傳遞和背壓處理。這取決於您正在使用的:硬體供應商、基礎架構、特定的SLA、以及高可用性要求等諸多因素。值得慶幸的是,Kafka可以被部署在包括裸金屬(bare metal)、虛擬機、容器、Kubernetes在內的許多基礎架構中。而當前廠商能夠生產的、可被用在邊緣處的最小晶片通常具有4GB、8GB、甚至16GB的RAM。
  • flink消費kafka的offset與checkpoint
    生產環境有個作業,邏輯很簡單,讀取kafka的數據,然後使用hive catalog,實時寫入hbase,hive,redis。使用的flink版本為1.11.1。為了防止寫入hive的文件數量過多,我設置了checkpoint為30分鐘。
  • Apache Kafka實戰
    這一切都是關於以可靠,快速和可擴展的方式傳輸大量數據的。在計算世界中,傳輸數據意味著消息傳遞。 Kafka用於高吞吐量用例,以可伸縮且容錯的方式移動大量數據。消息傳遞中的挑戰與局限消息傳遞是在應用程式和數據存儲之間傳輸數據的相當簡單的範例。
  • kafka入門(原理-搭建-簡單使用)
    前言公司在用kafka接受和發送數據,自己學習過Rabbitmq,不懂kafka挺不爽的,說幹就幹!網上找了許多帖子,學習了很多,小小的demo自己也搭建起來了,美滋滋,下面我認為優秀的網站和自己的步驟展現給大家。
  • 一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐
    擴展:關於消息語義注意:開發中SparkStreaming和kafka集成有兩個版本:0.8及0.10+0.8版本有Receiver和Direct模式(但是0.8版本生產環境問題較多,在Spark2.3之後不支持0.8版本了)。
  • 62.Kafka消息隊列訂閱發布
    The Streams API 允許一個應用程式作為一個流處理器,消費一個或者多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換.).broker.id=0 使用的認證協議security.inter.broker.protocol=SASL_PLAINTEXT 完成身份驗證的類authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer此文件是服務端 設置用戶名和密碼KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule
  • Apache Kafka客戶端KafkaProducer
    }  KafkaProducer在什麼地方對數據做的分區(切分):kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java看看最簡單的兩個構造方法:/
  • Apache+MySQL+Redis+nodejs
    Linux+Apache+MySQL+Redis+nodejscentos7.8apache(httpd-2.4.39)Mysql-5.7.29Redis-5.0.7nodejs(node-12.2.0)
  • Kafka 2.2.0基礎入門
    流處理平臺有以下三種特性:1. 可以發布和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。 2. 可以儲存流式的記錄,並且有較好的容錯性。 3. 可以在流式記錄產生時就進行處理。 Kafka適合什麼樣的場景?可以用於兩大類別的應用:1. 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。
  • 單機版kafka集群部署
    實現高性能、高可用、可伸縮和最終一致性架構。今天跟大家講解下如何部署單機kafka集群,希望大家喜歡。Kafka 是由 LinkedIn 開發的一個分布式的消息系統,使用 Scala 編寫,它以可水平擴展和高吞吐率而被廣泛使用。
  • 想做架構師都要學什麼?這些知識你一定要知道(上篇)
    特別是對於3~5年經驗的同學,一般公司都會要求有一定積累,同時對項目有初步的架構能力。很多同學覺得做技術管理不適合自己,想往架構師、技術專家的方向發展,卻苦於不知道學習什麼。本文精心整理了作為架構師應該了解的一些技術和框架。希望對各位讀者有所幫助。本篇涵蓋10+個方面,分為上中下三篇。
  • Apache Storm 0.9.6/0.10.0 發布
    Apache Storm 0.10.0 發布,此版本是個穩定版本,相比之前的 Beta 版本主要包括 bug 修復和改進:STORM-1108: Fix NPE in simulated
  • Kafka知識整理
    消息隊列消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題實現高性能,高可用,可伸縮和最終一致性架構使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ作用主要有:解耦,
  • 大數據從業人員必要技能之Kafka
    當前Kafka最重要的兩大客戶端是 Java客戶端和libkafka客戶端 ,它們更新和維護的速度很快,非常適合我們持續花時間投入。下一步就可以嘗試修改樣例代碼嘗試去理解並使用其他的API,之後觀測修改樣例代碼後的執行結果。如果這些都難不倒我們,接著就可以自己編寫一個小型項目來驗證下學習成果,然後就是改善和提升客戶端的可靠性和性能了。到了這一步,後面就可以熟讀一遍Kafka官網文檔,確保理解了那些可能影響可靠性和性能的參數。