原文地址:https://www.jdon.com/55160
原文作者:banq
這篇文章描述了基於Apache Kafka和Redis的體系結構如何應用於構建高性能,彈性流系統。它適用於近實時系統,在該系統中,需要處理大量事件流,並將結果提交給大量的訂戶,每個訂戶都接收自己的流視圖。
示例可能包括以下內容:
該體系結構假設數據量很大,可能需要計算量大的步驟才能計算出各個視圖。該體系結構假定reducer(負責計算的組件)可以獨立擴展並通過重新啟動從故障中恢復。它們的無狀態性質和動態擴展使它們非常適合在Kubernetes集群中進行部署。
上圖使用流系統的術語將博彩賠率分發給所有連接的Web和移動客戶端。
總體而言,該系統由幾個部分組成,這些部分協同工作並獨立擴展:
Apache Kafka
Apache Kafka允許以容錯,分布式的方式將輸入的數據流和計算結果存儲在管道的後續階段。Apache Kafka還允許在高數據負載的情況下將新伺服器連接到系統。
使用Kafka開始進行有趣的項目的一種簡單方法是使用docker-compose,其設置類似於以下內容:
version : '3.5'services: zookeeper: image: <font>"confluentinc/cp-zookeeper"</font><font> environment: - ZOOKEEPER_CLIENT_PORT=2181 kafka: image: </font><font>"confluentinc/cp-kafka"</font><font> environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT:</font><font><i>//kafka:9092</i></font><font> - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 kafka_<b>rest</b>: image: </font><font>"confluentinc/cp-kafka-rest"</font><font> ports: - </font><font>"8082:8082"</font><font> environment: - KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_REST_LISTENERS=http:</font><font><i>//0.0.0.0:8082</i></font><font> - KAFKA_REST_HOST_NAME=kafka_<b>rest</b>networks: <b>default</b>: name: oda</font>
請注意,以上dockerfile中的競爭條件(Zookeeper啟動速度比Kafka慢)導致Kafka服務無法啟動,必須使用docker-compose scale kafka=1重新啟動。服務啟動後,我們可以如下測試配置:
docker run --net=oda --rm confluentinc/cp-kafka bash -c <font>"seq 42 | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic foo && echo 'Produced 42 messages.'"</font><font>docker run --net=oda --rm confluentinc/cp-kafka kafka-console-consumer --bootstrap-server kafka:9092 --topic foo --from-beginning --max-messages 42</font>
下面是一個短節點腳本,它通過提供默認參數來與上面介紹的dockerfile一起使用,簡化了使用kafka控制臺工具的操作:
#!/usr/local/bin/nodelet child_process = require('child_process');let cmds = { 'topics' : ['kafka-topics', '--zookeeper', 'zookeeper:2181'], 'produce' : ['kafka-console-producer', '--broker-list', 'kafka:9092'], 'consume' : ['kafka-console-consumer', '--bootstrap-server', 'kafka:9092']}let params = <b>null</b>;process.argv.forEach((arg) => { <font><i>// first with command</i></font><font> <b>if</b> (params == <b>null</b> && cmds[arg] != <b>null</b>) { let cmd = cmds[arg]; params = ['run', '--net=oda', '--rm', '-it', 'confluentinc/cp-kafka', ...cmd] } </font><font><i>// add the rest</i></font><font> <b>if</b> (params != <b>null</b>){ params.push(arg); }});let docker = child_process.spawn('docker', params, {stdio: 'inherit'});docker.on('error', (err) => { console.log('Failed to start docker');});docker.on('close', (code) => { console.log(`Child process exited with code ${code}`);});</font>
這樣可以輕鬆進行如下測試:
Zookeeper可以並且也應該縮放。由於需要仲裁,用於確定Zookeeper實例數來運行式是2 * F + 1其中F是所期望的容錯因子。
為了測試確實有數據正在通過系統傳輸,讓我們為Kafka Producer編寫代碼。我將使用一個非常基本的NodeJS程序包,該程序包可連接到Kafka的REST API。對於生產用途,應首選更高級的軟體包,例如kafka-node撰寫本文時快速增長的軟體包kafkajs。我在這裡使用該kafka-rest軟體包是為了簡化和方便。
<font><i>// kafka rest is exposed from docker-compose on 8082</i></font><font><b>const</b> kafka = <b>new</b> KafkaRest({ 'url': 'http:</font><font><i>//localhost:8082' });</i></font><font></font><font><i>// make sure the topic is created before</i></font><font><b>const</b> target = kafka.topic('random_walk');<b>const</b> randWalker = function(){ function clamp(v, min, max){ <b>if</b> (v < min) <b>return</b> min; <b>if</b> (v > max) <b>return</b> max; <b>return</b> v; } let dir = 0; let prevStep = 0; <b>const</b> rnd = require('random'); <b>return</b> { randomWalk(){ dir = clamp(rnd.normal(dir, 1)(), -0.75, 0.75); prevStep += dir; <b>return</b> prevStep; } }}();setInterval(()=> { </font><font><i>// very basic producer, no key, no partitions, just straight round robin</i></font><font> target.produce( randWalker.randomWalk().toFixed(4) );}, 1000);</font>
一個簡單的消費使用者可以實現如下:
<b>const</b> KafkaRest = require('kafka-<b>rest</b>');<b>const</b> kafka = <b>new</b> KafkaRest({ 'url': 'http:<font><i>//localhost:8082' });</i></font><font> </font><font><i>// start reading from the beginning</i></font><font>let consumerConfig = { 'auto.offset.reset' : 'smallest'};</font><font><i>// join a consumer group on the matches topic</i></font><font>kafka.consumer(</font><font>"consumer-group"</font><font>).join(consumerConfig, function(err, instance) { <b>if</b> (err) <b>return</b> console.log(</font><font>"Failed to create instance in consumer group: "</font><font> + err); console.log(</font><font>"Consumer instance initialized: "</font><font> + instance.toString()); <b>const</b> stream = instance.subscribe(</font><font>"random_walk"</font><font>); stream.on('data', function(msgs) { <b>for</b>(<b>var</b> i = 0; i < msgs.length; i++) { let key = msgs[i].key.toString('utf8'); let value = msgs[i].value.toString('utf8'); console.log(`${key} : ${value}`); } }); });</font>
消費組/使用組提供了構建提議的體系結構的核心抽象。使用方組允許一組並發進程使用來自Kafka主題的消息,同時,確保不會為兩個使用方分配相同的分區列表。由於流量波動以及由於使用者崩潰導致的重啟,因此可以無縫地擴大和縮小使用者組。使用者組中的使用者會收到一個rebalance通知回調,其中包含分配給他們的分區列表,並且他們可以從分區的開頭,該組的另一個成員最後提交的偏移量或自管理的分區中恢復使用抵消。
上面的代碼提供了一種非常簡單的方法來檢查使用者組的工作方式。只需啟動幾個競爭的消費者並殺死其中一個,或稍後再重新啟動即可。由於重新啟動策略設置為主題的開頭,因此'auto.offset.reset' : 'smallest'每次使用都會從每個分區的開頭開始。
Redis注意點
僅通過使用Redis管道儘可能多地批量更新Redis,架構才能實現其最高吞吐量。
對於其他方案,僅在Redis更新後才可以提交Kafka偏移,以犧牲吞吐量來確保正確性。
主要命令和數據流
在本節中,我們將採用賠率更新方案(sportsbook),在這種情況下,更新需要儘快推送到偵聽的前端。
一個簡單的場景:用戶想要訂閱單個市場的更改
Reducers可能會在內存中維護更新的市場值的副本,以進行快速訪問和進程內緩存,但是對於每個訂閱的市場,其值也必須保存在進程外的Redis HA集群或分片的MongoDB中。訂閱時,如果reducer尚未存儲市場,即尚未有其他訂閱者訂閱,則必須首先在共享的Redis或Mongo中查找該市場。在來自Kafka主題的新進入市場更新中,必須首先更新Redis / Mongo。如果無法跳過更新,則僅在Redis / Mongo寫入成功後才提交偏移量。訂閱也保存在Redis / Mongo中,以使Reducer重新啟動,放大或縮小。
為了處理reducer的重新啟動,reducer知道了分區邏輯,因此在讀取訂閱信息時,它將知道它將服務的市場ID類別和忽略的市場ID。如果在將消息發布給訂閱客戶端之後發送對Kafka的ACK,則reducer可能會選擇延遲初始化,而不會從Redis為其分配的市場中讀取當前狀態。如果訂閱率很高,那麼reducer可能會選擇在Redis的另一個集合中存儲一份受歡迎的市場列表,以期渴望初始化時閱讀,但在我看來,這不僅僅是一種優化,而不僅僅是必須實現的功能。從頭開始。
變化
替代方法
可以用於原型製作和相對穩定的MVP的另一種方法是使用諸如MongoDB流(或Firebase或RethinkDB)之類的東西來監聽對集合的更改,在該集合中狀態被存儲和修改。每個文檔都是變更的基本單位。客戶可以訂閱多個文檔。所有更改都會傳播到所有發布者,發布者會確定哪些是正確的收件人。
這種對體系結構的支持使模型更加簡單,使開發人員無需管理複雜的問題(例如持久性,同步和還原),同時減少了依賴項的數量並簡化了操作。
原文地址:https://www.jdon.com/55160
原文作者:banq