使用Spring Cloud Data Flow 來實現數據流處理

2020-12-14 騰訊網

隨著雲原生、微服務應用等的概念的逐漸深入人心,很多企業用戶都已經在使用Spring Boot和Spring Cloud等流行的開源技術框架來開發Java微服務,實現很多在線事務處理(OLTP)類的業務應用。

那麼,對於數據密集型(Data Intensive)的應用,比如定期執行的批處理(Batch Processing)或持續的實時數據流處理(Stream Processing),Spring社區是否也有對應的開源項目可以方便大家日常的工作呢?

確實有這樣的一個框架,那就是Spring Cloud Data Flow(SCDF),相對於大名鼎鼎的Spring Boot和Spring Cloud,在國內大家可能還不太熟悉SCDF,今天我們就給大家介紹一下。

SCDF目前(截止2020/6)的最新版為2.5,官網地址為: https://dataflow.spring.io/

概述

SCDF中的數據微服務應用仍然是Spring Boot應用,通過Spring Cloud Stream抽象了流處理/消息機制,Spring Cloud Task抽象了批處理任務,讓開發人員專注於業務邏輯的開發,而不用關心太多底層的細節。

SCDF的主要運行組件包括Data Flow Server和Skipper Server。運行數據可以保存在主流關係資料庫如MySQL, PostgreSQL, Oracle, DB2, SQLServer等,流處理模式還需要依賴RabbitMQ或Kafka。

SCDF像Spring Boot一樣,可以直接用Java命令運行起來,也可以以容器的方式運行在Kubernetes集群或Cloud Foundry平臺(Tanzu Application Service)上。Spring Cloud Skipper Server負責對接運行平臺,並安裝、部署、伸縮、升級或回滾SCDF應用。

SCDF提供可視化界面Dashboard便於進行日常管理,如註冊應用,編排數據流,執行數據流,查看執行結果和歷史等。當然也可以利用API或DSL CLI的方式進行交互。

常見的數據處理

大家可能遇到這樣的一些業務場景:

關係資料庫中的「熱」數據需要緩存一份以提升讀取性能,在數據更新後還能保持緩存數據的持續更新。

採用內存數據網格如GemFire處理大規模的數據更新,以避免關係資料庫成為性能瓶頸,但GemFire中的數據需要及時寫回關係資料庫,以保證數據的持續更新。

當A微服務完成某個業務操作後,不是採用緊耦合的API調用B服務,而是發送消息,所有訂閱這個事件消息的服務都可以收到通知,繼續後續的處理,形成消息流轉的路徑。

IoT傳感器不斷上傳最新的數據,流數據處理平臺需要計算移動時間窗口內的計數或統計,發現異常模式後,發出通知告警。

每天凌晨1:00利用CDC (變化數據獲取機制)將當天業務數據導出生成CSV文件,上傳到文件伺服器;下遊系統在2:00從文件伺服器下載文件,解析後導入到自己的資料庫。

給數據打標籤後,提供給機器學習模型以供訓練。

……

可以發現,數據處理大致都包括如下的基本步驟:

Source: 從數據源讀取解析數據,可以是文件,資料庫,NoSQL資料庫,消息等

Process: 對數據進行處理加工,如校驗,去重,格式轉換,數據增強/豐富化等

Sink: 保存處理完的數據,或發送處理完畢的事件消息,或調用第三方系統API等

SCDF的口號是「連接一切」,有超過60個現成的組件以供數據集成,大致包括如下類別:

文件: File, FTP/SFTP, log, syslog, S3, hdfs

資料庫: JDBC/SQL, cdc-debezium, pg-copy (Greenplum)

NoSQL: 文檔資料庫MongoDB,內存數據網格GemFire,緩存Redis,KV資料庫Cassandra

消息中間件:JMS, RabbitMQ, MQTT, Kafka, ……

通信協議:HTTP, TCP, WebSocket, grpc, mail

時間: time, trigger

統計:計數,過濾,分支,合併,轉換

機器學習:TensorFlow, PMML, 圖片識別

數據處理模式

SCDF同時支持批處理和流處理這兩種最常見的數據處理模式。

批處理

基於Spring Cloud Task實現批處理任務,由Spring Cloud Scheduler基於cron時間設置觸發執行(也可手工觸發執行)。在任務處理結果和狀態保存到資料庫後,任務會結束運行。採用Spring Batch編程模型(Job/Step)可以處理批處理常見場景如任務執行中間出錯後從斷點繼續運行,事務管理,以及任務執行進度、統計等。

流處理

流處理基於Spring Cloud Stream模型,需要綁定(bind) RabbitMQ或Kafka作為底層的消息通信總線,也支持AWS Kinesis, Azure Event Hubs等。流處理的主要模型即為Source-Process-Sink,有豐富的現成組件可以使用。

管理與監控

如果數據處理流比較簡單,可以直接通過類似於Unix的管道方式的DSL進行定義:

流處理:

ohttp| jdbc, 表示從http流中讀取數據,經過一定的處理後,通過jdbc寫入資料庫

批處理:

otask1 && task2,表示順序執行task1, task2.

otask1 || task2, 表示並行執行task1, task2.

對於複雜的數據處理流程,可以像編排工作流一樣去設計數據流,支持串行、並行和條件執行。在SCDF的管理界面Dashboard以」拖拉」的方式所見即所得:

SCDF還集成了Prometheus/InfluxDB/Grafana,方便日常運維監控。

實戰樣例

如果已經心動,想要動手實戰一下的話,可以參照https://dataflow.spring.io/docs/stream-developer-guides/streams/data-flow-stream/去試一試:

場景

移動通信公司會收集用戶使用的電話通話時長和數據服務的流量。基於定價策略,計算所產生的費用,並出具帳單。

設計

步驟

1. 安裝Spring Cloud Data Flow,如選擇本地安裝

2. 因為以上是流處理場景,還需要安裝RabbitMQ

3. (可選)從https://github.com/spring-cloud/spring-cloud-dataflow-samples下載並構建這個示例spring-cloud-dataflow-samples/dataflow-website/stream-developer-guides/streams/standalone-stream-rabbitmq/,如果不想本地構建,可以直接使用Maven庫中已經構建好的包或docker鏡像庫中的鏡像

4. 訪問管理界面:http://localhost:9393/dashboard(或使用Shell命令)

a. 註冊三個應用,注意各自的應用類型

b. 創建流(Stream),用DSL可以簡單的表示為:usage-detail-sender | usage-cost-processor | usage-cost-logger

c. 部署流,運行流處理應用

d. 從運行時(runtime)中觀察日誌輸出{"userId": "user1", "callCost": "24.6", "dataCost": "19.25" }

具體步驟可參見

https://dataflow.spring.io/docs/stream-developer-guides/streams/data-flow-stream/。

如果想研究更多實例的話,可以從https://docs.spring.io/spring-cloud-dataflow-samples/docs/current/reference/htmlsingle/開始。

總結與展望

如果企業的數據處理場景還沒有專門的ETL(抽取轉換加載)工具或Spark/Flink團隊,那麼採用SCDF可以充分利用開發團隊現有的技能和經驗,基於熟悉的Java語言和Spring Boot開發框架,以微服務架構和CI/CD工具實現常見的數據處理任務。

當然,如果企業需要用Python, R語言進行數據處理,SCDF也同樣支持(以docker鏡像方式)。如果企業已經在嘗試使用Event Sourcing和CQRS架構,那麼SCDF也可以作為其中的關鍵組件發揮作用。

SCDF屬於Tanzu產品家族中的」構建(Build)」部分,隨著Tanzu產品的不斷整合和豐富,SCDF很快就會集成Tanzu Observability by Wavefront提供更好的可觀察性,並提供Paketo Cloud native Buildpack 以實現自動構建鏡像,以及加入Bitnami/Tanzu Application Catelog提供經過認證的產品安裝包Helm 3 Chart.

以上只是嚮導式的入門級介紹,如果你對SCDF感興趣,請立即安裝和體驗:https://dataflow.spring.io/docs/installation/

作者:羅治年 (lzhinian@vmware.com)

羅治年是VMware應用現代化部門的資深雲原生應用架構師,有近20年的軟體研發和架構設計經驗,曾先後就職於Pivotal、迪士尼、畢博管理諮詢、埃森哲等,長期從事企業IT規劃,企業級系統架構設計,及系統研發和實施管理等工作。近期主要專注於採用敏捷開發方法實現微服務雲原生應用的設計和開發,以及傳統應用現代化並往雲上遷移擁有豐富的實戰經驗。他是認證的Spring Professional, Kubernetes 管理員 (CKA) 和Cloud Foundry 專家。

誠邀您參加VMware於7月16日下午14:30舉辦的「VMware原生安全解決方案」網絡論壇。點擊【閱讀原文】了解詳情,立即註冊參會!

相關焦點

  • SpringBoot(五) :spring data jpa 的使用
    spring data jpaSpring Data JPA 是 Spring 基於 ORM 框架、JPA 規範的基礎上封裝的一套JPA應用框架,可使開發者用極簡的代碼即可實現對數據的訪問和操作。它提供了包括增刪改查等在內的常用功能,且易於擴展!學習並使用 Spring Data JPA 可以極大提高開發效率!
  • 使用redis在SpringCloud getway中進行速率限制
    為了使用Redis處理速率限制器,我們還需要向spring-boot-starter-data-redis-reactive啟動器添加依賴項。其他依賴關係用於測試目的。mockserver測試容器內提供的模塊。它負責模擬目標服務。反過來,該庫在測試期間mockserver-client-java用於與mockserver容器集成。
  • Spring Cloud 中 Zuul 網關到底有何牛逼之處?竟然這麼多人在用!
    zuul網關其底層使用ribbon來實現請求的路由,並內置Hystrix,可選擇性提供網關fallback邏輯。使用zuul的時候,並不推薦使用Feign作為application client端的開發實現。畢竟Feign技術是對ribbon的再封裝,使用Feign本身會提高通訊消耗,降低通訊效率,只在服務相互調用的時候使用Feign來簡化代碼開發就夠了。
  • Sentinel Dashboard(基於1.8.1)流控規則持久化到Nacos——涉及...
    那麼就會出現很嚴重的問題(流控規則達不到預期,配置數據不一致),所以推薦使用Sentinel Dashboard統一界面進行配置管理流控規則正因為Sentinel Dashboard當前版本(截至目前為止是1.8.1-SNAPSHOT)暫不支持,但是可以通過改造部分源碼實現此功能,具體請看下面介紹。
  • springboot(五):spring data jpa的使用
    spring data jpaSpring Data JPA 是 Spring 基於 ORM 框架、JPA 規範的基礎上封裝的一套JPA應用框架,可使開發者用極簡的代碼即可實現對數據的訪問和操作。它提供了包括增刪改查等在內的常用功能,且易於擴展!學習並使用 Spring Data JPA  可以極大提高開發效率!
  • 基於Spring Boot和Spring Cloud實現微服務架構
    Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • 實現微服務架構最流行Style,Spring Boot+Spring Cloud
    ,具體來說當你使用maven dependency引入spring jar包時它就在工作了。Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。
  • 基於 Spring Boot 和 Spring Cloud 實現微服務架構
    Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • 基於Spring Boot和Spring Cloud實現微服務架構學習
    Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring Framework:即通常所說的spring 框架,是一個開源的Java/Java EE全功能棧應用程式框架,其它spring項目如spring boot也依賴於此框架。
  • 厲害了,教你用 Spring Cloud 實現微服務
    ,具體來說當你使用maven dependency引入spring jar包時它就在工作了。Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。
  • Spring Cloud-Hystrix 斷路器
    在Spring Cloud Netflix棧中,每個微服務都以HTTP接口的形式暴露自身服務,因此在調用遠程服務時就必須使用到HTTP客戶端。我們可以使用JDK原生的URLConnection、Apache的Http Client、Netty的異步HTTP Client,還有之前我們使用到Spring的RestTemplate,這些都可以實現遠程調用。
  • 5w 字 | 172 圖 | 超級賽亞級 Spring Cloud 實戰
    [0].data-id=datasource.ymlspring.cloud.nacos.config.extension-configs[0].group=devspring.cloud.nacos.config.extension-configs[0].refresh=truespring.cloud.nacos.config.extension-configs[
  • Spring Cloud Gateway實現Token校驗
    RequestRateLimiter GatewayFilter FactoryRequestRateLimiter GatewayFilter 用一個RateLimiter實現來決定當前請求是否被允許處理。如果不被允許,默認將返回一個HTTP 429狀態,表示太多的請求。
  • 安全的Spring Cloud配置
    如果您對一些基礎知識感興趣,請參閱我以前的文章,其中涉及使用Spring Cloud Config進行微服務配置。本文涵蓋的主題是:敏感數據的加密和解密在伺服器端設置SSL配置客戶端上的SSL連接1.加密和解密如果使用JDK 8或更低版本,則首先需要下載並安裝Oracle提供的Java密碼學擴展(JCE)。
  • Spring Cloud Alibaba 發布 GA 版本,新增 4 個模塊
    版本概要概要: 增加了 4 個新的模塊:spring-cloud-alibaba-dubbo、spring-cloud-alibaba-seata、spring-cloud-alibaba-sentinel-zuul 以及 spring-cloud-alicloud-sms。
  • 初識Spring Cloud Stream,什麼是消息驅動微服務框架
    通過使用 Spring Cloud Stream 可以有效簡化開發人員對消息中間件的使用複雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。Spring Cloud Stream 基於 Spring Boot 實現,所以它秉承了 Spring Boot 的優點,自動化配置功能可以幫助我們快速上手使用。
  • Springcloud序之Springboot2x模塊化+rest assured+AES加解密實現
    本文主要使用Springboot進行多模塊項目的實現方式,並結合接口測試常用框架rest assured以及AES加解密來綜合講解多模塊項目的一些常用功能的實現。並作為後續Springcloud的序篇,後面我會以模塊化項目的方式來逐步實現Springcloud各個組件的講解與代碼演示,以期和大家一起對Springcloud常用功能有更深入的理解。
  • Spring Data Redis使用
    本文我們就來看看這個東西。快照持久化9.Redis之AOF持久化10.Redis主從複製(一)11.Redis主從複製(二)12.Redis集群搭建13.Jedis使用Spring Data Redis介紹Spring Data Redis是Spring官方推出,可以算是Spring框架集成Redis操作的一個子框架,封裝了Redis的很多命令,可以很方便的使用Spring
  • 基於Spring Boot+Cloud構建微雲架構
    Spring Boot:旨在簡化創建產品級的 Spring 應用和服務,簡化了配置文件,使用嵌入式web伺服器,含有諸多開箱即用微服務功能,可以和spring cloud聯合部署。Spring XD:是一種運行時環境(伺服器軟體,非開發框架),組合spring技術,如spring batch、spring boot、spring data,採集大數據並處理。
  • 教程 | 使用MNIST數據集,在TensorFlow上實現基礎LSTM網絡
    長短期記憶(LSTM)是目前循環神經網絡最普遍使用的類型,在處理時間序列數據時使用最為頻繁。關於 LSTM 的更加深刻的洞察可以看看這篇優秀的博客:http://colah.github.io/posts/2015-08-Understanding-LSTMs/。