使用Kafka Streams創建流數據管道

2020-09-07 聞數起舞

創建基於規則的流數據拓撲

什麼是流拓撲?

拓撲是通過流(邊緣)連接的流處理器(節點)的有向無環圖(DAG)。 DAG的一些關鍵特徵是它是有限的,並且不包含任何循環。 創建流式拓撲可以使數據處理器成為小型,專注的微服務,可以輕鬆地對其進行分配和擴展,並可以並行執行其工作。

為什麼要使用kafka Streams?

Kafka Streams是由Confluent開發的API,用於構建使用Kafka主題,分析,轉換或豐富輸入數據然後將結果發送到另一個Kafka主題的流應用程式。 它使您可以使用簡潔的代碼以分布式且容錯的方式執行此操作。 Kafka Streams將處理器拓撲定義為流處理代碼的邏輯抽象。

> A simple Kafka Streams topology

Kafka Streams的關鍵概念

· 流是無界的,不斷更新的數據集,由有序,可重播和容錯的鍵值對序列組成。

· 流處理器是拓撲中的一個節點,它一次從拓撲中的上遊處理器接收一個輸入記錄,對其應用操作,並可以選擇向其下遊處理器生成一個或多個輸出記錄。

· 源處理器是沒有任何上遊處理器的處理器。

· 接收器處理器是沒有任何下遊處理器的處理器。

入門

在本教程中,我將使用Kafka和Kafka Streams的Java API。 我將假設您對使用Maven來構建Java項目有基本的了解,並且對Kafka有基本的了解,並且已經設置了Kafka實例。 Lenses.io提供了一種快速簡便的容器化解決方案,用於在此處設置Kafka實例。

首先,我們需要將kafka-clients和kafka-streams作為依賴項添加到項目pom.xml中:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.6.0</version></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version></dependency>

建立流式拓撲

流拓撲需要一個或多個輸入,中間和輸出主題。 可在此處找到有關創建新Kafka主題的信息。 一旦創建了必要的主題,就可以創建流式拓撲。 這是為輸入主題創建拓撲的示例,其中值序列化為JSON(由GSON序列化/反序列化)。

import java.util.Properties;import com.google.gson.JsonObject;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;// ... //Properties props = new Properties();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCATION_OF_KAFKA_BROKERS);props.put(StreamsConfig.APPLICATION_ID_CONFIG, &34;);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, JsonObject> inputStream = builder.stream(inputTopic);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();

上面的示例是一個非常簡單的流式拓撲,但是到目前為止,它實際上並沒有做任何事情。 需要特別注意的是,拓撲是由執行前一代碼段的應用程式執行和保留的,該拓撲不會在Kafka代理中運行。 由創建的應用程式支付所有拓撲處理開銷。

可以通過執行以下命令來停止正在運行的拓撲:

streams.close();

為了使這種拓撲更加有用,我們需要定義基於規則的分支(或邊)。 在下一個示例中,我們基於JSON消息有效負載中特定欄位的值創建具有3個分支的基本拓撲。

public static final double SOME_CONSTANT = ...;// ... //// define conditions for each branch (edge) of topologyPredicate<String, JsonObject> greaterThan = (String key, JsonObject value) -> { double dValue = value.get(&34;).getAsDouble(); return dValue > SOME_CONSTANT;};Predicate<String, JsonObject> lessThan = (String key, JsonObject value) -> { double dValue = value.get(&34;).getAsDouble(); return dValue < SOME_CONSTANT;};Predicate<String, JsonObject> equalTo = (String key, JsonObject value) -> { double dValue = value.get(&34;).getAsDouble(); // epsilon is an arbitrarily small real number, such as 1e-15 return Math.abs(dValue - SOME_CONSTANT) < epsilon;};Predicate<String, JsonObject>[] conditions = new Predicate<>[] { greaterThan, lessThan, equalTo };KStream<String, JsonElement>[] branches = inputStream.branch(conditions);// define a output topic for each branch (edge)branches[0].to(&34;);branches[1].to(&34;);branches[2].to(&34;);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();

我們剛剛創建的拓撲如下圖所示:

上一個示例中分支的下遊使用者可以使用與任何其他Kafka主題完全相同的方式來使用分支主題。 下遊處理器可以產生自己的輸出主題。 因此,將下遊處理器的結果與原始輸入主題結合起來可能很有用。 我們還可以使用Kafka Streams API定義規則,以將結果輸出主題加入單個流中。

交叉流

Kafka Streams通過SQL連接對其流連接功能進行建模。 共有三種連接:

· 內部聯接:當兩個輸入主題都有具有相同鍵的記錄時,發出輸出。

· 左連接:為左輸入或主輸入主題中的每個記錄發出輸出。 如果另一個主題沒有給定鍵的值,則將其設置為null。

· 外部聯接:為任一輸入主題中的每個記錄發出輸出。 如果只有一個源包含密鑰,則另一個為null。

對於我們的示例,我們將輸入記錄流和下遊處理器的結果連接在一起。 在這種情況下,最有意義的是將輸入主題視為主要主題進行左聯接。 這將確保加入的流始終輸出原始輸入記錄,即使沒有可用的處理器結果也是如此。

import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.JoinWindows;import org.apache.kafka.streams.kstream.KStream;// ... //StreamsBuilder builder = new StreamsBuilder();KStream<String, JsonObject> inputStream = builder.stream(inputTopic);KStream<String, JsonObject> resultStream = builder.stream(resultTopic);// left join with default serializers and deserializersKStream<String, JsonObject> joined = inputStream.leftJoin(resultStream, (inputValue, outputValue) -> &34; + inputValue + &34; + outputValue, /* ValueJoiner */ JoinWindows.of(TimeUnit.MINUTES.toMillis(5)));joined.to(outputTopic);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();

最終的整體拓撲如下圖所示:

以編程方式可以使用相同的服務來創建和執行兩個流拓撲,但是在示例中我避免這樣做以使圖保持非循環狀態。

參考文獻

· Confluent文檔

· 了解Kafka流的流處理

· Java KafkaStreams簡介

(本文翻譯自Jason Snouffer的文章《Creating a streaming data pipeline with Kafka Streams》,參考:https://itnext.io/creating-a-streaming-data-pipeline-with-kafka-streams-898fb352a7b7)

相關焦點

  • Spark Streaming|Flink|Storm|Kafka Streams如何選擇流處理框架
    根據最新的統計顯示,僅在過去的兩年中,當今世界上90%的數據都是在新產生的,每天創建2.5萬億字節的數據,並且隨著新設備,傳感器和技術的出現,數據增長速度可能會進一步加快。從技術上講,這意味著我們的大數據處理將變得更加複雜且更具挑戰性。
  • 生產環境使用Apache Kafka和Redis的流架構
    它適用於近實時系統,在該系統中,需要處理大量事件流,並將結果提交給大量的訂戶,每個訂戶都接收自己的流視圖。為了測試確實有數據正在通過系統傳輸,讓我們為Kafka Producer編寫代碼。我將使用一個非常基本的NodeJS程序包,該程序包可連接到Kafka的REST API。
  • Spring Boot 中文參考指南(2.1.6)附錄A、KAFKA、RABBIT屬性
    spring.artemis.embedded.queues= # 啟動時要創建的隊列的逗號分隔列表。spring.artemis.embedded.server-id= # 伺服器 ID。默認情況下,使用自動遞增計數器。spring.artemis.embedded.topics= # 啟動時要創建的主題的逗號分隔列表。
  • Kafka事務流由理論到實例
    我們可以使用Kafka的命令行工具創建一個主題並發布一些消息。我們可以使用docker exec工具對kafka容器進行操作方便地調用內置的CLI工具:docker exec -it kafka-kafdrop_kafka_1 bash上面的命令將讓我麼進入容器的shell命令行界面。
  • 淺談kafka
    一 kafka是什麼在流式計算中,Kafka一般用來緩存數據,sparkstreaming通過消費Kafka的數據進行計算。to output streams.即:應用程式使用producer API發布消息到1個或多個topic中。應用程式使用consumer API來訂閱一個或多個topic,並處理產生的消息。
  • 大數據必學知識之20道Kafka知識點
    1.kafka的3個關鍵功能?發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。以容錯的持久方式存儲記錄流。處理記錄流。2.kafka通常用於兩大類應用?建立實時流數據管道,以可靠地在系統或應用程式之間獲取數據構建實時流應用程式,以轉換或響應數據流3.kafka特性?
  • 使用Kafka和Kafka Stream設計高可用任務調度
    /更新,並發送給Kafka主題; 說任務時間表· 水平縮放的Kafka Streams應用程式將從主題分區讀取這些定義,並將它們存儲在各自的狀態存儲中· 這些Kafka Consumer應用程式將使用Kafka流處理器API在狀態存儲中管理這些任務定義· Transformer實現將定期調用Punctuator· 打孔器從存儲中讀取所有任務定義
  • 「事件驅動架構」GoldenGate創建從Oracle到Kafka的CDC事件流(2)
    步驟8/12:為大數據安裝GoldenGate同樣,從這個頁面下載Oracle GoldenGate for Big Data 12c只需要使用VM中安裝的Firefox瀏覽器(我在Linux x86-64上使用Oracle GoldenGate for Big Data 12.3.2.1.1)。
  • 分布式流平臺Kafka
    以容錯的持久化方式存儲消息流3. 在消息流產生時處理它們目前,Kafka通常應用於兩大類應用:1. 構建實時的流數據管道,可靠地在系統和應用程式之間獲取數據2.你可以認為kafka是一種高性能、低延遲的提交日誌存儲、備份和傳播功能的分布式文件系統,並且可以通過客戶端來控制讀取數據的位置。Kafka的流處理Kafka流處理不僅僅用來讀寫和存儲流式數據,它最終的目的是為了能夠進行實時的流處理。
  • Kafka 2.2.0基礎入門
    流處理平臺有以下三種特性:1. 可以發布和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。 2. 可以儲存流式的記錄,並且有較好的容錯性。 3. 可以在流式記錄產生時就進行處理。 Kafka適合什麼樣的場景?可以用於兩大類別的應用:1. 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。
  • Kafka 基本原理(8000 字小結)
    ,broker會創建新的segment。從代理刪除消息變得很棘手,因為代理並不知道消費者是否已經使用了該消息。Kafka創新性地解決了這個問題,它將一個簡單的基於時間的SLA應用於保留策略。當消息在代理中超過一定時間後,將會被自動刪除。這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費數據。
  • Kafka Python快速數據架構教程
    我使用的母版是mesos1.admintome.lab。接下來,我們將使用kafka-mesos.sh腳本創建主題:注意,API參數指向我們使用kafka-mesos創建的Kafka調度程序。你可以驗證自己現在是否具有正確的主題:我們的新主題已準備就緒!現在是時候學習有趣的東西並開始開發我們的Python應用程式了。
  • 大數據開發:Apache Kafka分布式流式系統
    Kafka在大數據流式處理場景當中,正在受到越來越多的青睞,尤其在實時消息處理領域,kafka的優勢是非常明顯的。相比於傳統的消息中間件,kafka有著更多的潛力空間。今天的大數據開發分享,我們就主要來講講Apache Kafka分布式流式系統。
  • Apache Kafka是 一個分布式流處理平臺. 這到底意味著什麼呢?
    我們知道流處理平臺有以下三種特性:可以讓你發布和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。可以儲存流式的記錄,並且有較好的容錯性。可以在流式記錄產生時就進行處理。Kafka適合什麼樣的場景?它可以用於兩大類別的應用:構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。
  • Kafka實時API探秘
    使用Confluent REST Proxy向 Kafka 生成數據只需要進行一個簡單的 REST 調用:curl -X POST \ -H "Content-Type: application/vnd.kafka.json.v2+json" \ -H "Accept: application/vnd.kafka.v2+json&
  • Canal+Kafka實現MySQL與Redis數據同步
    如果資料庫數據發生更新,這時候就需要在業務代碼中寫一段同步更新redis的代碼。這種數據同步的代碼跟業務代碼糅合在一起會不太優雅,能不能把這些數據同步的代碼抽出來形成一個獨立的模塊呢,答案是可以的。架構圖canal是一個偽裝成slave訂閱mysql的binlog,實現數據同步的中間件。上一篇文章《canal入門》我已經介紹了最簡單的使用方法,也就是tcp模式。
  • 玩了分布式這麼久,你不會連Kafka都不清楚吧
    作為流處理器。Kafka 可以建立流數據管道,可靠地在系統或應用之間獲取數據。建立流式應用傳輸和響應數據。為了在這樣的消息系統中傳輸數據,你需要有合適的數據管道:這種數據的交互看起來就很混亂,如果我們使用消息傳遞系統,那麼系統就會變得更加簡單和整潔。Kafka 運行在一個或多個數據中心的伺服器上作為集群運行:Kafka 集群存儲消息記錄的目錄被稱為 Topics。
  • Kafka-manager部署與使用簡單介紹
    是 Yahoo 推出的 Kafka 開源管理工具,用於管理Apache Kafka集群的工具,用戶可以在Web界面執行一些簡單的Kafka集群管理操作Kafka Manager支持以下內容:管理多個集群 輕鬆檢查群集狀態(主題,使用者,偏移量,代理,副本分發,分區分發)運行首選副本選擇 生成帶有選項的分區分配
  • springboot整合Kafka,使用zookeeper做服務治理
    ;/version> </dependency>2.在springboot配置中加入kafka相關配置,springboot啟動時候會自動加載這些配置,完成連結kafka,創建producer,consumer等。
  • 大數據框架Spark的流處理SparkStreaming詳細總結
    其中,批式大數據又被稱為歷史大數據,流式大數據又被稱為實時大數據。大數據技術就是處理海量數據並獲取其中的價值,但這些價值並非完全一樣。一些數據在發生後不久更有價值並隨著時間推移其價值迅速下降。流處理支持這樣的場景,提供更快的有價值信息,通常在從觸發器開始的幾毫秒到幾秒內。2、為什麼使用流處理(1)有些數據天然地作為無止盡事件流出現。