拓撲是通過流(邊緣)連接的流處理器(節點)的有向無環圖(DAG)。 DAG的一些關鍵特徵是它是有限的,並且不包含任何循環。 創建流式拓撲可以使數據處理器成為小型,專注的微服務,可以輕鬆地對其進行分配和擴展,並可以並行執行其工作。
Kafka Streams是由Confluent開發的API,用於構建使用Kafka主題,分析,轉換或豐富輸入數據然後將結果發送到另一個Kafka主題的流應用程式。 它使您可以使用簡潔的代碼以分布式且容錯的方式執行此操作。 Kafka Streams將處理器拓撲定義為流處理代碼的邏輯抽象。
> A simple Kafka Streams topology
· 流是無界的,不斷更新的數據集,由有序,可重播和容錯的鍵值對序列組成。
· 流處理器是拓撲中的一個節點,它一次從拓撲中的上遊處理器接收一個輸入記錄,對其應用操作,並可以選擇向其下遊處理器生成一個或多個輸出記錄。
· 源處理器是沒有任何上遊處理器的處理器。
· 接收器處理器是沒有任何下遊處理器的處理器。
在本教程中,我將使用Kafka和Kafka Streams的Java API。 我將假設您對使用Maven來構建Java項目有基本的了解,並且對Kafka有基本的了解,並且已經設置了Kafka實例。 Lenses.io提供了一種快速簡便的容器化解決方案,用於在此處設置Kafka實例。
首先,我們需要將kafka-clients和kafka-streams作為依賴項添加到項目pom.xml中:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.6.0</version></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version></dependency>
流拓撲需要一個或多個輸入,中間和輸出主題。 可在此處找到有關創建新Kafka主題的信息。 一旦創建了必要的主題,就可以創建流式拓撲。 這是為輸入主題創建拓撲的示例,其中值序列化為JSON(由GSON序列化/反序列化)。
import java.util.Properties;import com.google.gson.JsonObject;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;// ... //Properties props = new Properties();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCATION_OF_KAFKA_BROKERS);props.put(StreamsConfig.APPLICATION_ID_CONFIG, &34;);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, JsonObject> inputStream = builder.stream(inputTopic);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();
上面的示例是一個非常簡單的流式拓撲,但是到目前為止,它實際上並沒有做任何事情。 需要特別注意的是,拓撲是由執行前一代碼段的應用程式執行和保留的,該拓撲不會在Kafka代理中運行。 由創建的應用程式支付所有拓撲處理開銷。
可以通過執行以下命令來停止正在運行的拓撲:
streams.close();
為了使這種拓撲更加有用,我們需要定義基於規則的分支(或邊)。 在下一個示例中,我們基於JSON消息有效負載中特定欄位的值創建具有3個分支的基本拓撲。
public static final double SOME_CONSTANT = ...;// ... //// define conditions for each branch (edge) of topologyPredicate<String, JsonObject> greaterThan = (String key, JsonObject value) -> { double dValue = value.get(&34;).getAsDouble(); return dValue > SOME_CONSTANT;};Predicate<String, JsonObject> lessThan = (String key, JsonObject value) -> { double dValue = value.get(&34;).getAsDouble(); return dValue < SOME_CONSTANT;};Predicate<String, JsonObject> equalTo = (String key, JsonObject value) -> { double dValue = value.get(&34;).getAsDouble(); // epsilon is an arbitrarily small real number, such as 1e-15 return Math.abs(dValue - SOME_CONSTANT) < epsilon;};Predicate<String, JsonObject>[] conditions = new Predicate<>[] { greaterThan, lessThan, equalTo };KStream<String, JsonElement>[] branches = inputStream.branch(conditions);// define a output topic for each branch (edge)branches[0].to(&34;);branches[1].to(&34;);branches[2].to(&34;);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();
我們剛剛創建的拓撲如下圖所示:
上一個示例中分支的下遊使用者可以使用與任何其他Kafka主題完全相同的方式來使用分支主題。 下遊處理器可以產生自己的輸出主題。 因此,將下遊處理器的結果與原始輸入主題結合起來可能很有用。 我們還可以使用Kafka Streams API定義規則,以將結果輸出主題加入單個流中。
Kafka Streams通過SQL連接對其流連接功能進行建模。 共有三種連接:
· 內部聯接:當兩個輸入主題都有具有相同鍵的記錄時,發出輸出。
· 左連接:為左輸入或主輸入主題中的每個記錄發出輸出。 如果另一個主題沒有給定鍵的值,則將其設置為null。
· 外部聯接:為任一輸入主題中的每個記錄發出輸出。 如果只有一個源包含密鑰,則另一個為null。
對於我們的示例,我們將輸入記錄流和下遊處理器的結果連接在一起。 在這種情況下,最有意義的是將輸入主題視為主要主題進行左聯接。 這將確保加入的流始終輸出原始輸入記錄,即使沒有可用的處理器結果也是如此。
import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.JoinWindows;import org.apache.kafka.streams.kstream.KStream;// ... //StreamsBuilder builder = new StreamsBuilder();KStream<String, JsonObject> inputStream = builder.stream(inputTopic);KStream<String, JsonObject> resultStream = builder.stream(resultTopic);// left join with default serializers and deserializersKStream<String, JsonObject> joined = inputStream.leftJoin(resultStream, (inputValue, outputValue) -> &34; + inputValue + &34; + outputValue, /* ValueJoiner */ JoinWindows.of(TimeUnit.MINUTES.toMillis(5)));joined.to(outputTopic);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();
最終的整體拓撲如下圖所示:
以編程方式可以使用相同的服務來創建和執行兩個流拓撲,但是在示例中我避免這樣做以使圖保持非循環狀態。
· Confluent文檔
· 了解Kafka流的流處理
· Java KafkaStreams簡介
(本文翻譯自Jason Snouffer的文章《Creating a streaming data pipeline with Kafka Streams》,參考:https://itnext.io/creating-a-streaming-data-pipeline-with-kafka-streams-898fb352a7b7)