Apache Beam實戰指南 | 手把手教你玩轉KafkaIO與Flink

2021-02-13 AI前線
AI 前線導讀: 本文是 Apache Beam 實戰指南系列文章 第二篇,將重點介紹 Apache Beam 與 Flink 的關係,對 Beam 框架中的 KafkaIO 和 Flink 源碼進行剖析,並結合應用示例和代碼解讀帶你進一步了解如何結合 Beam 玩轉 Kafka 和 Flink。系列文章第一篇回顧《Apache Beam 實戰指南之基礎入門》。

更多優質內容請關注微信公眾號「AI 前線」(ID:ai-front)

關於 Apache Beam 實戰指南系列文章

隨著大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。近年來湧現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者經常要用到不同的技術、框架、API、開發語言和 SDK 來應對複雜應用的開發,這大大增加了選擇合適工具和框架的難度,開發者想要將所有的大數據組件熟練運用幾乎是一項不可能完成的任務。

面對這種情況,Google 在 2016 年 2 月宣布將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣布開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對於 Beam 還缺乏了解,社區中文資料也比較少。InfoQ 期望通過 Apache Beam 實戰指南系列文章 推動 Apache Beam 在國內的普及。

大數據發展趨勢從普通的大數據,發展成 AI 大數據,再到下一代號稱萬億市場的 lOT 大數據。技術也隨著時代的變化而變化,從 Hadoop 的批處理,到 Spark Streaming,以及流批處理的 Flink 的出現,整個大數據架構也在逐漸演化。

Apache Beam 作為新生技術,在這個時代會扮演什麼樣的角色,跟 Flink 之間的關係是怎樣的?Apache Beam 和 Flink 的結合會給大數據開發者或架構師們帶來哪些意想不到的驚喜呢?

圖 2-1 MapReduce 流程圖

最初做大數據是把一些日誌或者其他信息收集後寫入 Hadoop 的 HDFS 系統中,如果運營人員需要報表,則利用 Hadoop 的 MapReduce 進行計算並輸出,對於一些非計算機專業的統計人員,後期可以用 Hive 進行統計輸出。

圖 2-2Storm 流程圖

業務進一步發展,運營人員需要看到實時數據的展示或統計。例如電商網站促銷的時候,用於統計用戶實時交易數據。數據收集也使用 MQ,用流式 Storm 解決這一業務需求問題。

圖 2-3 Spark 流程圖

業務進一步發展,服務前端加上了網關進行負載均衡,消息中心也換成了高吞吐量的輕量級 MQ Kafka,數據處理漸漸從批處理發展到微批處理。

圖 2-4 Flink 流程圖

隨著 AI 和 loT 的發展,對於傳感設備的信息、報警器的警情以及視頻流的數據量微批計算引擎已經滿足不了業務的需求,Flink 實現真正的流處理讓警情更實時。

2.5 下一代大數據處理統一標準 Apache Beam

圖 2-5 Apache Beam 流程圖

BeamSDKs 封裝了很多的組件 IO,也就是圖左邊這些重寫的高級 API,使不同的數據源的數據流向後面的計算平臺。通過將近一年的發展,Apache Beam 不光組件 IO 更加豐富了,並且計算平臺在當初最基本的 Apache Apex、Direct Runner、Apache Flink、Apache Spark、Google Cloud Dataflow 之上,又增加了 Gearpump、Samza 以及第三方的 JStorm 等計算平臺。

為什麼說 Apache Beam 會是大數據處理統一標準呢?

因為很多現在大型公司都在建立自己的「大中臺」,建立統一的數據資源池,打通各個部門以及子公司的數據,以解決信息孤島問題,把這些數據進行集中式管理並且進行後期的數據分析、BI、AI 以及機器學習等工作。這種情況下會出現很多數據源,例如之前用的 MySQL、MongodDB、HDFS、HBase、Solr 等,如果想建立中臺就會是一件令人非常苦惱的事情,並且多計算環境更是讓技術領導頭疼。Apache Beam 的出現正好迎合了這個時代的新需求,它集成了很多資料庫常用的數據源並把它們封裝成 SDK 的 IO,開發人員沒必要深入學習很多技術,只要會寫 Beam 程序就可以了,大大節省了人力、時間以及成本。

三.Apache Beam 和 Flink 的關係

隨著阿里巴巴 Blink 的開源,Flink 中國社區開始活躍起來。很多人會開始對各種計算平臺進行對比,比如 Storm、Spark、JStorm、Flink 等,並且有人提到之前阿里巴巴開源的 JStorm 比 Flink 性能高出 10-15 倍,為什麼阿里巴巴卻轉戰基於 Flink 的 Blink 呢? 在最近 Flink 的線下技術會議上,阿里巴巴的人已經回答了這一問題。其實很多技術都是從業務實戰出來的,隨著業務的發展可能還會有更多的計算平臺出現,沒有必要對此過多糾結。

不過,既然大家最近討論得這麼火熱,這裡也列出一些最近問的比較多的、有代表性的關於 Beam 的問題,逐一進行回答。

1. Flink 支持 SQL,請問 Beam 支持嗎?

現在 Beam 是支持 SQL 處理的,底層技術跟 Flink 底層處理是一樣的。

Beam SQL 現在只支持 Java,底層是 Apache Calcite 的一個動態數據管理框架,用於大數據處理和一些流增強功能,它允許你自定義資料庫功能。例如 Hive 使用了 Calcite 的查詢優化,當然還有 Flink 解析和流 SQL 處理。Beam 在這之上添加了額外的擴展,以便輕鬆利用 Beam 的統一批處理 / 流模型以及對複雜數據類型的支持。 以下是 Beam SQL 具體處理流程圖:

Beam SQL 一共有兩個比較重要的概念:

SqlTransform:用於 PTransforms 從 SQL 查詢創建的接口。

Row:Beam SQL 操作的元素類型。例如:PCollection。

在將 SQL 查詢應用於 PCollection 之前,集合中 Row 的數據格式必須要提前指定。 一旦 Beam SQL 指定了 管道中的類型是不能再改變的。PCollection 行中欄位 / 列的名稱和類型由 Schema 進行關聯定義。您可以使用 Schema.builder() 來創建 Schemas。

示例:

// Define the schema for the records.
Schema appSchema =
 Schema
   .builder()
   .addInt32Field("appId")
   .addStringField("description")
   .addDateTimeField("rowtime")
   .build();
// Create a concrete row with that type.
Row row =
 Row
   .withSchema(appSchema)
   .addValues(1, "Some cool app", new Date())
   .build();
// Create a source PCollection containing only that row
PCollection<Row> testApps =
 PBegin
   .in(p)
   .apply(Create
             .of(row)
             .withCoder(appSchema.getRowCoder()));

也可以是其他類型,不是直接是 Row,利用 PCollection通過應用 ParDo 可以將輸入記錄轉換為 Row 格式。如:


class AppPojo {
Integer appId;
String description;
Date timestamp;
}

PCollection<AppPojo> pojos = ...

PCollection<Row> apps = pojos
.apply(
    ParDo.of(new DoFn<AppPojo, Row>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
       
        AppPojo pojo = c.element();
       
         
          Row appRow =
                Row
                  .withSchema(appSchema)
                 .addValues(
                    pojo.appId,
                    pojo.description,
                   pojo.timestamp)
                  .build();
       
        c.output(appRow);
      }
    }));

Beam 在抽象 Flink 的時候已經把這個參數抽象出來了,在 Beam Flink 源碼解析中會提到。

3. 我這裡有個流批混合的場景,請問 Beam 是不是支持?

這個是支持的,因為批也是一種流,是一種有界的流。Beam 結合了 Flink,Flink dataset 底層也是轉換成流進行處理的。

4. Flink 流批寫程序的時候和 Beam 有什麼不同?底層是 Flink 還是 Beam?

打個比喻,如果 Flink 是 Lucene,那麼 Beam 就是 Solr,把 Flink 的 API 進行二次重寫,簡化了 API,讓大家使用更簡單、更方便。此外,Beam 提供了更多的數據源,這是 Flink 不能比的。當然,Flink 後期可能也會往這方面發展。

四.Apache Beam KafkaIO 源碼剖析 KafkaIO 對 kafka-clients 支持依賴情況

KafkaIO 是 Kafka 的 API 封裝,主要負責 Apache Kafka 讀取和寫入消息。如果想使用 KafkaIO,必須依賴 beam-sdks-java-io-kafka ,KafkaIO 同時支持多個版本的 Kafka 客戶端,使用時建議用高版本的或最新的 Kafka 版本,因為使用 KafkaIO 的時候需要包含 kafka-clients 的依賴版本。

Apache Beam  KafkaIO 對各個 kafka-clients 版本的支持情況如下表:

表 4-1  KafkaIO 與 kafka-clients 依賴關係表

Apache Beam V2.1.0 版本之前源碼中的 pom 文件都顯式指定了特定的 0.9.0.1 版本支持,但是從 V2.1.0 版本和 V2.1.1 兩個版本開始已經替換成了 kafka-clients 的 0.10.1.0 版本,並且源碼中提示 0.10.1.0 版本更安全。這是因為去年 Kafka 0.10.1.0 之前的版本曝出了安全漏洞。在 V2.2.0 以後的版本中,Beam 對 API 做了調整和更新,對之前的兩種版本都支持,不過需要在 pom 中引用的時候自己指定 Kafka 的版本。但是在 Beam V2.5.0 和 V2.6.0 版本,源碼中添加了以下提示:

* <h3>Supported Kafka Client Versions</h3>
* KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.
* <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions
* 0.9.x - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future.
* Please ensure that the version included with the application is compatible with the version of
* your Kafka cluster. Kafka client usually fails to initialize with a clear error message in
* case of incompatibility.
*/

也就說在這兩個版本已經移除了對 Kafka 客戶端 0.10.1.0 以前版本的支持,舊版本還會支持,但是在以後不久就會刪除。所以大家在使用的時候要注意版本的依賴關係和客戶端的版本支持度。

如果想使用 KafkaIO,pom 必須要引用,版本跟 4-1 表中的對應起來就可以了。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>...</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>a_recent_version</version>
<scope>runtime</scope>
</dependency>

KafkaIO 源碼連結如下:

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

在 KafkaIO 裡面最主要的兩個方法是 Kafka 的讀寫方法。

pipeline.apply(KafkaIO.<Long, String>read()
      .withBootstrapServers("broker_1:9092,broker_2:9092")
      .withTopic("my_topic")    
      .withKeyDeserializer(LongDeserializer.class)
     .withValueDeserializer(StringDeserializer.class)
     
     
     
     
     .updateConsumerProperties(ImmutableMap.of("group.id",   "my_beam_app_1"))

     
     
     
      .withLogAppendTime()  
     
      .withReadCommitted()  
     
      .commitOffsetsInFinalize()  
     
     .withoutMetadata()
   )
   .apply(Values.<String>create())

1) 指定 KafkaIO 的模型,從源碼中不難看出這個地方的 KafkaIO類型是 Long 和 String 類型,也可以換成其他類型。

pipeline.apply(KafkaIO.<Long, String>read()

2) 設置 Kafka 集群的集群地址。

.withBootstrapServers("broker_1:9092,broker_2:9092")

3) 設置 Kafka 的主題類型,源碼中使用了單個主題類型,如果是多個主題類型則用 withTopics(List) 方法進行設置。設置情況基本跟 Kafka 原生是一樣的。

.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.

4) 設置序列化類型。Apache Beam KafkaIO 在序列化的時候做了很大的簡化,例如原生 Kafka 可能要通過 Properties 類去設置 ,還要加上很長一段 jar 包的名字。

Beam KafkaIO 的寫法:

.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

原生 Kafka 的設置:

Properties props = new Properties();
props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");

5) 設置 Kafka 的消費者屬性,這個地方還可以設置其他的屬性。源碼中是針對消費分組進行設置。

.updateConsumerProperties(ImmutableMap.of("group.id", my_beam_app_1"))

6) 設置 Kafka 吞吐量的時間戳,可以是默認的,也可以自定義。

7) 相當於 Kafka 中"isolation.level", "read_committed" ,指定 KafkaConsumer 只應讀取非事務性消息,或從其輸入主題中提交事務性消息。流處理應用程式通常在多個讀取處理寫入階段處理其數據,每個階段使用前一階段的輸出作為其輸入。通過指定 read_committed 模式,我們可以在所有階段完成一次處理。針對"Exactly-once" 語義,支持 Kafka 0.11 版本。

8) 設置 Kafka 是否自動提交屬性"AUTO_COMMIT",默認為自動提交,使用 Beam 的方法來設置。

set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize)
.commitOffsetsInFinalize()

9) 設置是否返回 Kafka 的其他數據,例如 offset 信息和分區信息,不用可以去掉。

10) 設置只返回 values 值,不用返回 key。例如  PCollection,而不是 PCollection。

.apply(Values.<String>create()) // PCollection<String>

寫操作跟讀操作配置基本相似,我們看一下具體代碼。

PCollection<KV<Long, String>> kvColl = ...;
kvColl.apply(KafkaIO.<Long, String>write()
    .withBootstrapServers("broker_1:9092,broker_2:9092")
    .withTopic("results")
    .withKeySerializer(LongSerializer.class)
    .withValueSerializer(StringSerializer.class)
   
   
    .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
   
    .withInputTimestamp()
   
    .withPublishTimestampFunction((elem, elemTs) -> ...)
   
    .withEOS(20, "eos-sink-group-id");
 );

下面這個是 Kafka 裡面比較重要的一個屬性設置,在 Beam 中是這樣使用的,非常簡單,但是要注意這個屬性.withEOS 其實就是 Kafka 中"Exactly-once"。

.withEOS(20, "eos-sink-group-id");

在寫入 Kafka 時完全一次性地提供語義,這使得應用程式能夠在 Beam 管道中的一次性語義之上提供端到端的一次性保證。它確保寫入接收器的記錄僅在 Kafka 上提交一次,即使在管道執行期間重試某些處理也是如此。重試通常在應用程式重新啟動時發生(如在故障恢復中)或者在重新分配任務時(如在自動縮放事件中)。Flink runner 通常為流水線的結果提供精確一次的語義,但不提供變換中用戶代碼的副作用。如果諸如 Kafka 接收器之類的轉換寫入外部系統,則這些寫入可能會多次發生。

在此處啟用 EOS 時,接收器轉換將兼容的 Beam Runners 中的檢查點語義與 Kafka 中的事務聯繫起來,以確保只寫入一次記錄。由於實現依賴於 runners checkpoint 語義,因此並非所有 runners 都兼容。Beam 中 FlinkRunner 針對 Kafka 0.11+ 版本才支持,然而 Dataflow runner 和 Spark runner 如果操作 kafkaIO 是完全支持的。

"Exactly-once" 在接收初始消息的時候,除了將原來的數據進行格式化轉換外,還經歷了 2 個序列化 - 反序列化循環。根據序列化的數量和成本,CPU 可能會漲的很明顯。通過寫入二進位格式數據(即在寫入 Kafka 接收器之前將數據序列化為二進位數據)可以降低 CPU 成本。

numShards——設置接收器並行度。存儲在 Kafka 上的狀態元數據,使用 sinkGroupId 存儲在許多虛擬分區中。一個好的經驗法則是將其設置為 Kafka 主題中的分區數。

sinkGroupId——用於在 Kafka 上將少量狀態存儲為元數據的組 ID。它類似於與 KafkaConsumer 一起使用的使用 groupID。每個作業都應使用唯一的 groupID,以便重新啟動 / 更新作業保留狀態以確保一次性語義。狀態是通過 Kafka 上的接收器事務原子提交的。有關更多信息,請參閱 KafkaProducer.sendOffsetsToTransaction(Map,String)。接收器在初始化期間執行多個健全性檢查以捕獲常見錯誤,以便它不會最終使用似乎不是由同一作業寫入的狀態。

FlinkRunner 對 Flink 支持依賴情況

Flink 是一個流和批處理的統一的計算框架,Apache Beam 跟 Flink API 做了無縫集成。在 Apache Beam 中對 Flink 的操作主要是 FlinkRunner.java,Apache Beam 支持不同版本的 flink 客戶端。我根據不同版本列了一個 Flink 對應客戶端支持表如下:

圖 5-1  FlinkRunner 與 Flink 依賴關係表

從圖 5-1 中可以看出,Apache Beam 對 Flink 的 API 支持的更新速度非常快,從源碼可以看到 2.0.0 版本之前的 FlinkRunner 是非常 low 的,並且直接拿 Flink 的實例做為 Beam 的實例,封裝的效果也比較差。但是從 2.0.0 版本之後 ,Beam 就像打了雞血一樣 API 更新速度特別快,拋棄了以前的冗餘,更好地跟 Flink 集成,讓人眼前一亮。

因為 Beam 在運行的時候都是顯式指定 Runner,在 FlinkRunner 源碼中只是成了簡單的統一入口,代碼非常簡單,但是這個入口中有一個比較關鍵的接口類 FlinkPipelineOptions。

請看代碼:


private final FlinkPipelineOptions options;

通過這個類我們看一下 Apache Beam 到底封裝了哪些 Flink 方法。

首先 FlinkPipelineOptions 是一個接口類,但是它繼承了 PipelineOptions、ApplicationNameOptions、StreamingOptions 三個接口類,第一個 PipelineOptions 大家應該很熟悉了,用於基本管道創建;第二個 ApplicationNameOptions 用於設置應用程式名字;第三個用於判斷是流式數據還是批數據。原始碼如下:

public interface FlinkPipelineOptions  extends
PipelineOptions, ApplicationNameOptions, StreamingOptions {

}

1) 設置 Flink Master 方法 ,這個方法用於設置 Flink 集群地址的 Master 地址。可以填寫 IP 和埠,或者是 hostname 和埠,默認 local 。當然測試也可以是單機的,在 Flink 1.4 利用 start-local.sh 啟動,而到了 1.5 以上就去掉了這個腳本,本地直接換成了 start-cluster.sh。大家測試的時候需要注意一下。


@Description( "Address of the Flink Master where the Pipeline should  be executed. Can"+ "[collection] or [auto].")
void setFlinkMaster(String value);

2) 設置 Flink 的並行數,屬於 Flink 高級 API 裡面的屬性。設置合適的 parallelism 能提高運算效率,太多了和太少了都不行。設置 parallelism 有多種方式,優先級為 api>env>p>file。

@Description("The degree of parallelism to be used when distributing operations onto workers.")
@Default.InstanceFactory(DefaultParallelismFactory.class)
Integer getParallelism();
void setParallelism(Integer value);

3) 設置連續檢查點之間的間隔時間(即當前的快照)用於容錯的管道狀態。

@Description("The interval between consecutive checkpoints (i.e.  snapshots of the current"
@Default.Long(-1L)
Long getCheckpointingInterval();
void setCheckpointingInterval(Long interval)

4) 定義一致性保證的檢查點模式,默認為"AT_LEAST_ONCE",在 Beam 的源碼中定義了一個枚舉類 CheckpointingMode,除了默認的"AT_LEAST_ONCE",還有"EXACTLY_ONCE"。

"AT_LEAST_ONCE":這個模式意思是系統將以一種更簡單地方式來對 operator 和 udf 的狀態進行快照:在失敗後進行恢復時,在 operator 的狀態中,一些記錄可能會被重放多次。

"EXACTLY_ONCE":這種模式意思是系統將以如下語義對 operator 和 udf(user defined function) 進行快照:在恢復時,每條記錄將在 operator 狀態中只被重現 / 重放一次。

@Description("The checkpointing mode that defines consistency guarantee.")
@Default.Enum("AT_LEAST_ONCE")
CheckpointingMode getCheckpointingMode();
void setCheckpointingMode(CheckpointingMode mode);

5) 設置檢查點的最大超時時間,默認為 20*60*1000(毫秒)=20(分鐘)。

@Description("The maximum time that a checkpoint may take before being discarded.")
@Default.Long(20 * 60 * 1000)
Long getCheckpointTimeoutMillis();
void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis);

6) 設置重新執行失敗任務的次數,值為 0 有效地禁用容錯,值為 -1 表示使用系統默認值(在配置中定義)。

@Description(
"Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "+ "that the system default value (as defined in the configuration) should be used.")
@Default.Integer(-1)
Integer getNumberOfExecutionRetries();
void setNumberOfExecutionRetries(Integer retries);

7) 設置執行之間的延遲,默認值為 -1L。

@Description(
    "Sets the delay between executions. A value of {@code -1} "
        + "indicates that the default value should be used.")
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);

8) 設置重用對象的行為。

@Description("Sets the behavior of reusing objects.")
@Default.Boolean(false)
Boolean getObjectReuse();
void setObjectReuse(Boolean reuse);

9) 設置狀態後端在計算期間存儲 Beam 的狀態,不設置從配置文件中讀取默認值。注意:僅在執行時適用流媒體模式。

@Description("Sets the state backend to use in streaming mode. "
@JsonIgnore
AbstractStateBackend getStateBackend();
void setStateBackend(AbstractStateBackend stateBackend);

10) 在 Flink Runner 中啟用 / 禁用 Beam 指標。

@Description("Enable/disable Beam metrics in Flink Runner")
@Default.Boolean(true)
BooleangetEnableMetrics();
voidsetEnableMetrics(BooleanenableMetrics);

11) 啟用或禁用外部檢查點,與 CheckpointingInterval 一起使用。

@Description(
"Enables or disables externalized checkpoints."
+"Works in conjunction with CheckpointingInterval")
@Default.Boolean(false)
BooleanisExternalizedCheckpointsEnabled();
voidsetExternalizedCheckpointsEnabled(BooleanexternalCheckpoints);

12) 設置當他們的 Wartermark 達到 + Inf 時關閉源,Watermark 在 Flink 中其中一個作用是根據時間戳做單節點排序,Beam 也是支持的。

@Description("If set, shutdown sources when their watermark reaches +Inf.")
@Default.Boolean(false)
BooleanisShutdownSourcesOnFinalWatermark();
voidsetShutdownSourcesOnFinalWatermark(BooleanshutdownOnFinalWatermark);

剩餘兩個部分這裡不再進行翻譯,留給大家去看源碼。

本節通過解讀一個真正的 KafkaIO 和 Flink 實戰案例,幫助大家更深入地了解 Apache Beam KafkaIO 和 Flink 的運用。

Apache Beam 外部數據流程圖

設計思路:Kafka 消息生產程序發送 testmsg 到 Kafka 集群,Apache Beam 程序讀取 Kafka 的消息,經過簡單的業務邏輯,最後發送到 Kafka 集群,然後 Kafka 消費端消費消息。

Apache Beam 內部數據處理流程圖

Apache Beam 程序通過 kafkaIO 讀取 Kafka 集群的數據,進行數據格式轉換。數據統計後,通過 KafkaIO 寫操作把消息寫入 Kafka 集群。最後把程序運行在 Flink 的計算平臺上。

Kafka 集群和 Flink 單機或集群配置,大家可以去網上搜一下配置文章,操作比較簡單,這裡就不贅述了。

1)新建一個 Maven 項目

2)在 pom 文件中添加 jar 引用

<dependency>
 <groupId>org.apache.beam</groupId>
 <artifactId>beam-sdks-java-io-kafka</artifactId>
 <version>2.4.0</version>
</dependency>
<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>
</dependency>
<dependency>
 <groupId>org.apache.beam</groupId>
 <artifactId>beam-runners-core-java</artifactId>
 <version>2.4.0</version>
</dependency>
<dependency>
 <groupId>org.apache.beam</groupId>
 <artifactId>beam-runners-flink_2.11</artifactId>
 <version>2.4.0</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.5.2</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_2.11</artifactId>
 <version>1.5.2</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-runtime_2.11</artifactId>
 <version>1.5.2</version>
 
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.11</artifactId>
 <version>1.5.2</version>
 
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-metrics-core</artifactId>
 <version>1.5.2</version>
 
</dependency>

3)新建 BeamFlinkKafka.java 類

4)編寫以下代碼:

public static void main(String[] args) {

PipelineOptions options = PipelineOptionsFactory.create();

options.setRunner(FlinkRunner.class);

Pipeline pipeline = Pipeline.create(options);

PCollection<KafkaRecord<String, String>> lines =
pipeline.apply(KafkaIO.<String,

String>read().withBootstrapServers("192.168.1.110:11092,192.168.1.119:11092,192.168.1.120:11092")
    .withTopic("testmsg")
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")));

PCollection<String> kafkadata = lines.apply("Remove Kafka Metadata", ParDo.of(new DoFn<KafkaRecord<String, String>, String>() {
private static final long serialVersionUID = 1L;
 @ProcessElement
 public void processElement(ProcessContext ctx) {
  System.out.print("輸出的分區為 ----:" + ctx.element().getKV());
  ctx.output(ctx.element().getKV().getValue());
 }
}));
PCollection<String> windowedEvents = kafkadata.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))));
PCollection<KV<String, Long>> wordcount = windowedEvents.apply(Count.<String>perElement());
PCollection<String> wordtj = wordcount.apply("ConcatResultKVs", MapElements.via(
new SimpleFunction<KV<String, Long>, String>() {
 private static final long serialVersionUID = 1L;
  @Override
  public String apply(KV<String, Long> input) {
  System.out.print("進行統計:" + input.getKey() + ": " + input.getValue());
    return input.getKey() + ": " + input.getValue();
   }
  }));
wordtj.apply(KafkaIO.<Void, String>write()  .withBootstrapServers("192.168.1.110:11092,192.168.1.119:11092,192.168.1.120:11092")
  .withTopic("senkafkamsg")
 
  .withValueSerializer(StringSerializer.class)
 
 
  .values()
);
pipeline.run().waitUntilFinish();

5)打包 jar,本示例是簡單的實戰,並沒有用 Docker,Apache Beam 新版本是支持 Docker 的。

6)通過 Apache Flink Dashboard 提交 job

7)查看結果

程序接收的日誌如下:

本次實戰在源碼分析中已經做過詳細解析,在這裡不做過多的描述,只選擇部分問題再重點解釋一下。此外,如果還沒有入門,甚至連管道和 Runner 等概念都還不清楚,建議先閱讀本系列的第一篇文章 《Apache Beam 實戰指南之基礎入門》。

1.FlinkRunner 在實戰中是顯式指定的,如果想設置參數怎麼使用呢?其實還有另外一種寫法,例如以下代碼:








options.setRunner(FlinkRunner.class);

2.Kafka 有三種數據讀取類型,分別是 「earliest 」,「latest 」,「none 」,分別的意思代表是:

當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,從頭開始消費 。

當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據 。

topic 各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常。

.updateConsumerProperties(ImmutableMap.<String,Object>of("auto.offset.reset", "earliest")));

3. 實戰中我自己想把 Kafka 的數據寫入,key 不想寫入,所以出現了 Kafka 的 key 項為空,而 values 才是真正發送的數據。所以開始和結尾要設置個.values(),如果不加上就會報錯。

KafkaIO.<Void, String>write()
.values()

隨著 AI 和 loT 的時代的到來,各個公司不同結構、不同類型、不同來源的數據進行整合的成本越來越高。Apache Beam 技術的統一模型和大數據計算平臺特性優雅地解決了這一問題,相信在 loT 萬億市場中,Apache Beam 將會發揮越來越重要的角色。

張海濤,目前就職於海康威視雲基礎平臺,負責雲計算大數據的基礎架構設計和中間件的開發,專注云計算大數據方向。Apache Beam 中文社區發起人之一,如果想進一步了解最新 Apache Beam 動態和技術研究成果,請加微信 cyrjkj 入群共同研究和運用。

傳送門:系列文章第一篇《Apache Beam 實戰指南之基礎入門》

如果你喜歡這篇文章,或希望看到更多類似優質報導,記得給我留言和點讚哦!

相關焦點

  • Apache Beam實戰指南 | 手把手教你玩轉大數據存儲HdfsIO
    關於 Apache Beam 實戰指南系列文章隨著大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。近年來湧現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。
  • Apache Flink 1.5.5 和 1.6.2 發布,通用數據處理平臺
    </groupId>  <artifactId>flink-java</artifactId>  <version>1.5.5</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId
  • Flink寫入hive測試
    >      <groupId>org.apache.flink</groupId>      <artifactId>flink-core</artifactId>      <version>${flink.version}</version>      <!
  • flink-1.12.0 upsert-kafka connector demo
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java
  • 一篇文章讓深入理解Flink SQL 時間特性
    _import org.apache.flink.table.api.Tableimport org.apache.flink.table.api.scala._import org.apache.flink.table.api.EnvironmentSettingsimport org.apache.flink.table.api.scala.
  • 布客·ApacheCN 編程/後端/大數據/人工智慧學習資源 2020.6
    utm_source=home)C笨辦法學 C 中文版(https://github.com/apachecn/lcthw-zh?utm_source=home)PHP手把手教你寫 PHP 協程擴展(https://github.com/apachecn/study?
  • CoProcessFunction實戰三部曲之二:狀態處理
    GitHubhttps://github.com/zq2599/blog_demos內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;本篇概覽本文是《CoProcessFunction實戰三部曲
  • CoProcessFunction實戰三部曲之三:定時器和側輸出
    ;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.co.CoProcessFunction
  • Apache Flink 誤用之痛
    郵件列表:user@flink.apache.com/user-zh@flink.apache.orgStack Overflow:www.stackoverflow.com2.如果不在乎,你可以沒有 Checkpoint。第2個問題,是否在乎結果的正確性?在很多的場景裡面,我們非常關注結果的正確性,比如金融領域,但是另外一些場景比如監控或其他簡單的使用場景僅需要一個概要的數據統計。
  • Flink最難知識點再解析 | 時間/窗口/水印/遲到數據處理
    org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.scala.
  • Apache 年度報告出爐,HBase、Flink、Beam 成最活躍開源項目
    Apache 項目https://projects.apache.org/項目和子項目總數:339 個。頂級項目:206 個。Apche 孵化器的孵化項目:46 個。社區 / 人員http://home.apache.org/Apache 提交者:7203 名(活躍用戶 7038 名)。Apache 軟體基金會成員(個人):765 名。
  • 開篇|揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?
    本文為 Apache Flink 新版本重大功能特性解讀之 Flink SQL 系列文章的開篇,Flink SQL 系列文章由其核心貢獻者們分享,涵蓋基礎知識、實踐、調優、內部實現等各個方面,帶你由淺入深地全面了解 Flink SQL。1.
  • 《使命召喚手遊》實戰小屋地圖攻略 手把手教你實戰小屋怎麼打
    《使命召喚手遊》實戰小屋地圖攻略 手把手教你實戰小屋怎麼打時間:2020-12-23 18:55   來源:遊俠網   責任編輯:沫朵 川北在線核心提示:原標題:《使命召喚手遊》實戰小屋地圖攻略 手把手教你實戰小屋怎麼打 使命召喚手遊實戰小屋怎麼打?
  • 快手Before社區是什麼 手把手教你玩轉before社區
    快手Before社區是什麼 手把手教你玩轉before社區時間:2020-04-29 15:31   來源:騰牛網    責任編輯:沫朵 川北在線核心提示:原標題:快手Before社區是什麼 手把手教你玩轉before社區 Before社區是快手全新上線的新產品,主要是瞄準文藝青年的互動交友社區,相信還有很多人不是很了解
  • 布客·ApacheCN 編程/後端/大數據/人工智慧學習資源 2020.8
    utm_source=home)C笨辦法學 C 中文版(https://github.com/apachecn/lcthw-zh?utm_source=home)PHP手把手教你寫 PHP 協程擴展(https://github.com/apachecn/study?
  • Word在線翻譯怎麼用 手把手教你玩轉Word在線翻譯功能
    Word在線翻譯怎麼用 手把手教你玩轉Word在線翻譯功能時間:2018-02-07 16:10   來源:系統天堂   責任編輯:沫朵 川北在線核心提示:原標題:Word在線翻譯怎麼用 手把手教你玩轉Word在線翻譯功能 Word在線翻譯怎麼用?
  • Flink保證端到端exactly-once語義(也適用於kafka)
    作者:Moon_Storm連結:https:英文連結:https:來源:簡書2017年12月,apache
  • Flink 端到端 Exactly-once 機制剖析
    但是,flink有很多外接系統,比如將數據寫到kafka,一旦作業失敗重啟,offset重置,會消費舊數據,從而將重複的結果寫到kafka。如下圖:      這個時候,僅靠系統本身是無法保證exactly-once的。系統之間的數據一致性一般要靠2PC協議來保證,flink的TwoPhaseCommitSinkFunction也是基於此實現的。
  • 澳門深度遊指南 手把手教你玩轉新城區
    長此以往,對澳門也可以說是非常的了解了,比如那裡好玩、哪裡好吃、哪裡好逛...今天就跟要去打卡澳門的小可愛,分享一下我往返澳陸多次後,整理出澳門深度遊攻略~內容包括:事前準備、交通、住宿、購物、美食、網紅打卡點...手把手教你玩轉澳門新城區!感興趣的抓緊收藏。