Spring Batch 批處理框架技巧

2021-03-02 動力節點Java學院

(點擊上方公眾號,可快速關注)

學習問題歡迎留言或對話框諮詢

你好,歡迎來到Java基礎知識分解站,今天小編為猿猿們整理了Spring批處理框架的內容,掌握這些知識,可以幫你省去一些造輪子的過程,提高開發效率。本文由博主姚兆峰分享,小編整理後推送,希望對你的工作有幫助。

在大型的企業應用中,或多或少都會存在大量的任務需要處理,如郵件批量通知所有將要過期的會員等等。而在批量處理任務的過程中,又需要注意很多細節,如任務異常、性能瓶頸等等。那麼,使用一款優秀的框架總比我們自己重複地造輪子要好得多一些。

我所在的物聯網雲平臺部門就有這麼一個需求,需要實現批量下發命令給百萬設備。為了防止枯燥乏味,下面就讓我們先通過Spring Batch框架簡單地實現一下這個功能,再來詳細地介紹這款框架!

引入依賴

首先我們需要引入對Spring Batch的依賴,在pom.xml文件加入下面的代碼:

<dependency>

    <groupId>org.springframework.batch</groupId>

    <artifactId>spring-batch-core</artifactId>

    <version>3.0.8.RELEASE</version>

</dependency>


裝載Bean

其次,我們需要在resources目錄下,創建applicationContext.xml文件,用於自動注入我們需要的類:

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="

 

http://www.springframework.org/schema/beans

 

 

http://www.springframework.org/schema/beans/spring-beans.xsd">

 

    <!-- 事務管理器 -->

    <bean id="transactionManager"/>

 

    <!-- 任務倉庫 -->

    <bean id="jobRepository">

        <property name="transactionManager" ref="transactionManager"/>

    </bean>

 

    <!-- 任務加載器 -->

    <bean id="jobLauncher">

        <property name="jobRepository" ref="jobRepository"/>

    </bean>

 

</beans>

有了上面聲明的transactionManager、jobRepository、jobLauncher,我們就可以執行批量任務啦!不過,我們還需要創建一個任務。在Spring Batch框架中,一個任務Job由一個或者多個步驟Step,而步驟又由讀操作Reader、處理操作Processor、寫操作Writer組成,下面我們分別創建它們。

創建Reader

既然是讀操作,那麼肯定要有能讀的數據源,方便起見,我們直接在resources目錄下創建一個batch-data.csv文件,內容如下:

1,PENDING

2,PENDING

3,PENDING

4,PENDING

5,PENDING

6,PENDING

7,PENDING

8,PENDING

9,PENDING

10,PENDING

非常簡單,其中第一列代表著命令的id,第二列代表著命令的當前狀態。也就是說,現在有10條緩存的命令,需要下發給設備。

讀操作需要實現ItemReader<T>接口,框架提供了一個現成的實現類FlatFileItemReader。使用該類需要設置Resource和LineMapper。Resource代表著數據源,即我們的batch-data.csv文件;LineMapper則表示如何將文件的每行數據轉成對應的DTO對象。

創建DTO對象

由於我們的數據源是命令數據,所以我們需要創建一個DeviceCommand.java文件,代碼如下:

public class DeviceCommand {

 

    private String id;

 

    private String status;

 

    public String getId() {

        return id;

    }

 

    public void setId(String id) {

        this.id = id;

    }

 

    public String getStatus() {

        return status;

    }

 

    public void setStatus(String status) {

        this.status = status;

    }

}

自定義LineMapper

我們需要自己實現一個LineMapper實現類,用於將batch-data.csv文件的每行數據,轉成程序方便處理的DeviceCommand對象。

public class HelloLineMapper implements LineMapper<DeviceCommand> {

 

    @Override

    public DeviceCommand mapLine(String line, int lineNumber) throws Exception {

 

        // 逗號分割每一行數據

        String[] args = line.split(",");

         

        // 創建DeviceCommand對象

        DeviceCommand deviceCommand = new DeviceCommand();

         

        // 設置id值到對象中

        deviceCommand.setId(args[0]);

         

        // 設置status值到對象中

        deviceCommand.setStatus(args[1]);

         

        // 返回對象

        return deviceCommand;

 

    }

}

創建Processor

讀完數據後,我們就需要處理數據了。既然我們前面從文件裡讀取了待下發的命令,那麼在這裡下發命令給設備是最好的時機。處理操作需要實現ItemProcessor<I, O>接口,我們自己實現一個HelloItemProcessor.java即可,代碼如下:

public class HelloItemProcessor implements ItemProcessor<DeviceCommand, DeviceCommand> {

 

    @Override

    public DeviceCommand process(DeviceCommand deviceCommand) throws Exception {

 

        // 模擬下發命令給設備

        System.out.println("send command to device, id=" + deviceCommand.getId());

 

        // 更新命令狀態

        deviceCommand.setStatus("SENT");

 

        // 返回命令對象

        return deviceCommand;

         

    }

     

}

創建Writer

處理完數據後,我們需要更新命令狀態到文件裡,用於記錄我們已經下發。與讀文件類似,我們需要實現ItemWriter<T>接口,框架也提供了一個現成的實現類FlatFileItemWriter。使用該類需要設置Resource和LineAggregator。Resource代表著數據源,即我們的batch-data.csv文件;LineAggregator則表示如何將DTO對象轉成字符串保存到文件的每行。

自定義LineAggregator

我們需要自己實現一個LineAggregator實現類,用於將DeviceCommand對象轉成字符串,保存到batch-data.csv文件。

public class HelloLineAggregator implements LineAggregator<DeviceCommand> {

 

    @Override

    public String aggregate(DeviceCommand deviceCommand) {

 

        StringBuffer sb = new StringBuffer();

        sb.append(deviceCommand.getId());

        sb.append(",");

        sb.append(deviceCommand.getStatus());

        return sb.toString();

 

    }

}

主程序

那麼,完事具備,只欠東風!接下面我們在主程序Main.java裡實現我們的批量命令下發功能!代碼如下:

public class Main {

 

 

    public static void main(String[] args) throws Exception {

 

        // 加載上下文

        String[] configLocations = {"applicationContext.xml"};

        ApplicationContext applicationContext = new ClassPathXmlApplicationContext(configLocations);

 

        // 獲取任務啟動器

        JobLauncher jobLauncher = applicationContext.getBean(JobLauncher.class);

        JobRepository jobRepository = applicationContext.getBean(JobRepository.class);

        PlatformTransactionManager transactionManager = applicationContext.getBean(PlatformTransactionManager.class);

 

        // 創建reader

        FlatFileItemReader<DeviceCommand> flatFileItemReader = new FlatFileItemReader<>();

        flatFileItemReader.setResource(new FileSystemResource("src/main/resources/batch-data.csv"));

        flatFileItemReader.setLineMapper(new HelloLineMapper());

 

        // 創建processor

        HelloItemProcessor helloItemProcessor = new HelloItemProcessor();

 

        // 創建writer

        FlatFileItemWriter<DeviceCommand> flatFileItemWriter = new FlatFileItemWriter<>();

        flatFileItemWriter.setResource(new FileSystemResource("src/main/resources/batch-data.csv"));

        flatFileItemWriter.setLineAggregator(new HelloLineAggregator());

 

        // 創建Step

        StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);

        Step step = stepBuilderFactory.get("step")

                                      .<DeviceCommand, DeviceCommand>chunk(1)

                                      .reader(flatFileItemReader)       // 讀操作

                                      .processor(helloItemProcessor)    // 處理操作

                                      .writer(flatFileItemWriter)       // 寫操作

                                      .build();

 

        // 創建Job

        JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);

        Job job = jobBuilderFactory.get("job")

                                   .start(step)

                                   .build();

 

        // 啟動任務

        jobLauncher.run(job, new JobParameters());

 

    }

}

執行main方法之後,屏幕將會輸出下面信息:

send command to device, id=1

send command to device, id=2

send command to device, id=3

send command to device, id=4

send command to device, id=5

send command to device, id=6

send command to device, id=7

send command to device, id=8

send command to device, id=9

send command to device, id=10

再查看batch-data.csv文件,將會發現命令狀態全部更新為SENT:

1,SENT

2,SENT

3,SENT

4,SENT

5,SENT

6,SENT

7,SENT

8,SENT

9,SENT

10,SENT

至此,我們的批量命令下發全部成功!可以發現,使用Spring Batch框架來實現批處理非常的輕量,當然這只是它所有功能裡的冰山一角。

Spring Batch在官網是這樣一句話介紹自己的:

A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款輕量的、全面的批處理框架,用於開發強大的日常運營的企業級批處理應用程式。)

框架主要有以下功能:

Transaction management(事務管理)

Chunk based processing(基於塊的處理)

Declarative I/O(聲明式的輸入輸出)

Start/Stop/Restart(啟動/停止/再啟動)

Retry/Skip(重試/跳過)

如果你的批處理程序需要使用上面的功能,那就大膽地使用它吧!

框架全貌

框架一共有4個主要角色:JobLauncher是任務啟動器,通過它來啟動任務,可以看做是程序的入口。Job代表著一個具體的任務。Step代表著一個具體的步驟,一個Job可以包含多個Step(想像把大象放進冰箱這個任務需要多少個步驟你就明白了)。JobRepository是存儲數據的地方,可以看做是一個資料庫的接口,在任務執行的時候需要通過它來記錄任務狀態等等信息。

JobLauncher

JobLauncher是任務啟動器,該接口只有一個run方法:

public interface JobLauncher {

 

    public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;

 

}

除了傳入Job對象之外,還需要傳入JobParameters對象,後續講到Job再解釋為什麼要多傳一個JobParameters。通過JobLauncher可以在Java程序中調用批處理任務,也可以通過命令行或者其他框架(如定時調度框架Quartz、Web後臺框架Spring MVC)中調用批處理任務。Spring Batch框架提供了一個JobLauncher的實現類SimpleJobLauncher。

Job

Job代表著一個任務,一個Job與一個或者多個JobInstance相關聯,而一個JobInstance又與一個或者多個JobExecution相關聯:

考慮到任務可能不是只執行一次就再也不執行了,更多的情況可能是定時任務,如每天執行一次,每個星期執行一次等等,那麼為了區分每次執行的任務,框架使用了JobInstance。如上圖所示,Job是一個EndOfDay(每天最後時刻執行的任務),那麼其中一個JobInstance就代表著2007年5月5日那天執行的任務實例。框架通過在執行JobLauncher.run(Job, JobParameters)方法時傳入的JobParameters來區分是哪一天的任務。

由於2007年5月5日那天執行的任務可能不會一次就執行完成,比如中途被停止,或者出現異常導致中斷,需要多執行幾次才能完成,所以框架使用了JobExecution來表示每次執行的任務。

Step

一個Job任務可以分為幾個Step步驟,與JobExection相同,每次執行Step的時候使用StepExecution來表示執行的步驟。每一個Step還包含著一個ItemReader、ItemProcessor、ItemWriter,下面分別介紹這三者。

ItemReader

ItemReader代表著讀操作,其接口如下:

public interface ItemReader<T> {

 

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

 

}

框架已經提供了多種ItemReader接口的實現類,包括對文本文件、XML文件、資料庫、JMS消息等讀的處理,當然我們也可以自己實現該接口。

ItemProcessor

ItemReader代表著處理操作,其接口如下:

public interface ItemProcessor<I, O> {

 

    O process(I item) throws Exception;

 

}

process方法的形參傳入I類型的對象,通過處理後返回O型的對象。開發者可以實現自己的業務代碼來對數據進行處理。

ItemWriter

ItemReader代表著寫操作,其接口如下:

public interface ItemWriter<T> {

 

    void write(List<? extends T> items) throws Exception;

 

}

框架已經提供了多種ItemWriter接口的實現類,包括對文本文件、XML文件、資料庫、JMS消息等寫的處理,當然我們也可以自己實現該接口。

JobRepository

JobRepository用於存儲任務執行的狀態信息,比如什麼時間點執行了什麼任務、任務執行結果如何等等。框架提供了2種實現,一種是通過Map形式保存在內存中,當Java程序重啟後任務信息也就丟失了,並且在分布式下無法獲取其他節點的任務執行情況;另一種是保存在資料庫中,並且將數據保存在下面6張表裡:

BATCH_JOB_INSTANCE

BATCH_JOB_EXECUTION_PARAMS

BATCH_JOB_EXECUTION

BATCH_STEP_EXECUTION

BATCH_JOB_EXECUTION_CONTEXT

BATCH_STEP_EXECUTION_CONTEXT

Spring Batch框架的JobRepository支持主流的資料庫:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。可愛的是,我司的Gauss資料庫也是支持的,只不過需要稍加配置。

本文先通過一個批量下發命令的Demo教大家如何快速入門,再對框架進行一個從整體到部分的介紹,讓大家有一個基礎的認識。由於篇幅和能力有限,關於Spring Batch框架的內部實現細節和高級特性無法面面俱到,猿猿們如果需要視頻教程可以通過本公眾號對話框諮詢了解。

小編感悟:牛頓曾經說過,如果我看得比別人更遠些,那是因為我站在巨人的肩膀上。是啊,借鑑優秀的開源框架,取其精華去其糟粕,比起閉門造重複的輪子,是會看得更遠,更加成功一些!

恭喜你今天又收穫了新知識

遇到學習問題?歡迎留言或微信對話框諮詢!


喜訊 :

動力節點(2017Java全集教程)即將完成錄製,需要教程的同學現在可通過公眾號搶先預定 免費領取。

11月就業快報| 2名學員入職京東 1名入職美團..

程序猿,如何成為月入100萬的自由職業者?

11月程式語言排行榜,Java第一, iOS沒人要了?

當程序猿寫不出代碼了,怎麼辦?

從一個普通程序猿到CTO,經歷幾個階段

 一份來自阿里面試經驗

Java多線程面試題及答案

Java註解是怎樣成功上位的

零基礎入門Vip課程免費

史上最強Java學習路線

70個Java小白必背英語單詞

大型高級SVN教程精講發布

阿里巴巴Java開發者手冊

相關焦點

  • 一文輕鬆搞定批處理框架 Spring Batch
    這類工作即為「批處理」  為什麼使用Spring Batch  Spring Batch 作為 Spring 的子項目,是一款基於 Spring 的企業批處理框架。通過它可以構建出健壯的企業批處理應用。
  • Spring Batch真是個優秀的批處理框架,用完愛不釋手!
    1 前言Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量信息、定時處理大量數據等場景十分簡便。結合調度框架能更大地發揮Spring Batch的作用。
  • 通過例子講解Spring Batch入門,優秀的批處理框架
    Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產可用的特點。在應對高效處理大量信息、定時處理大量數據等場景十分簡便。結合調度框架能更大地發揮Spring Batch的作用。
  • Spring Batch 入門教程
    一個 Batch(批處理)過程由一個 Job(作業)組成。這個實體封裝了整個批處理過程。一個 Job(作業)可以由一個或多個 Step(步驟)組成。spring-boot-starter-test 包含用於測試Spring引導應用程式的依賴項。它導入了包括JUnit、Hamcrest和Mockito在內的庫。這個也有依賴性 spring-batch-test。這個庫包含一些幫助類,它們將幫助測試批處理作業。
  • Spring Batch 4.1.0.RC1,用於編寫批處理應用的框架
    Spring Batch 是一個使用 Spring 和 Java 編寫離線和批處理應用程式的框架。更新內容在這個版本中,開發團隊主要致力於使 Spring Batch 可在 Java 8,9,10 和 11 上正確構建和運行。
  • Spring Batch 4.3.0-M2 發布,批處理應用編寫框架
    Spring Batch 提供了一個名為 spring.batch.job.active 的度量,該度量給出了當前活動作業的數量。但是直到現在該指標仍無法讓用戶知道當前正在激活哪些特定作業。此版本豐富了度量標準標籤以實現此目標。3.
  • SpringBatch組合寫,複雜業務邏輯批處理必備
    作者 | 用Java打醬油編輯 | Kavien原文 | http://www.toutiao.com/a6467494519620239886/適用版本SpringBatch 3.0.8Spring4.3SpringBatch面臨的現實問題SpringBatch框架的批處理能力毋庸置疑
  • Spring Batch 4.0.0 正式發布,批處理框架
    Spring Batch 4.0.0  已發布,Spring Batch 是一個輕量級的,完全面向 Spring 的批處理框架,可以應用於企業級大量的數據處理系統。
  • SpringBatch從入門到放棄001- HelloWorld
    再 starter 裡面封裝了 batch-core。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>mysql
  • SpringBatch從入門到放棄002- 核心概念1
    根據 Spring 的加載規則,自動配置的類需要 spring.factories 中引入,所以我們打開這個文件,找到 batch 相關的配置Spring Boot2 中自動配置類org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration 但是我們在這個類中只看到 JobRepository 的引用
  • 計算機英語術語解釋:batch processing批處理
    新東方網>英語>英語學習>語法詞彙>分類詞彙>正文計算機英語術語解釋:batch processing批處理 2012-12-27 14:45 來源:英美者 作者:
  • Apex Batch和Apex Scheduler-定時批處理大量數據
    看一下英文表示,同步:Synchronize,異步:Asynchronous 批處理Apex語法 實現 :Database.Batchable 接口和三個方法:start()global (Database.QueryLocator | Iterable<sObject>) start(Database.BatchableContext
  • 相關 | 實用基礎小腳本,批處理命令Batch
    批處理命令也稱為批處理腳本,就是對某對象進行批量的處理,是一種簡化的腳本語言。批處理文件的擴展名為bat,也就是說我們在遇到的任何以bat為文件名的文件,都可以雙擊運行以及用記事本打開。下面介紹用記事本寫常用的一些批處理命令。在編寫之前,需要新建一個隨便命名的txt文檔,然後更改後綴名為bat;更改的時候會提示是否更改後綴,點擊yes就好;
  • 什麼是批標準化 (Batch Normalization)
    來自 | 知乎地址 | https://zhuanlan.zhihu.com/p/24810318作者 | 莫煩編輯 | 機器學習算法與自然語言處理公眾號Batch Normalization, 批標準化, 和普通的數據標準化類似, 是將分散的數據統一的一種做法, 也是優化神經網絡的一種方法.
  • ...谷歌新論文提出批再規範化:可降低批規範化模型中的minibatch依賴
    (非獨立同分布)minibatch 訓練時,用批再規範化訓練的模型的表現顯著優於批規範化訓練的模型。與此同時,批再規範化還保留了批規範化的優點,例如對初始化不敏感和訓練效率。算法 1:使用批再規範化的訓練(上)和推理(下),其在一個 minibatch 上應用了激活 x。在反向傳播過程中使用了標準的鏈式法則。
  • 使用Spring Cloud Data Flow 來實現數據流處理
    那麼,對於數據密集型(Data Intensive)的應用,比如定期執行的批處理(Batch Processing)或持續的實時數據流處理(Stream Processing),Spring社區是否也有對應的開源項目可以方便大家日常的工作呢?
  • 超參數調試、Batch正則化和程序框架——吳恩達DeepLearning.ai深度學習筆記之改善深層神經網絡(三)
    超參數調試處理 1.1 重要程度排序目前已經講到過的超參數中,重要程度依次是:1.2 調參技巧系統地組織超參調試過程的技巧:隨機選擇點(而非均勻選取),用這些點實驗超參數的效果。可以採用這種方式:試驗一個或一小批模型,初始化,試著讓其工作運轉,觀察它的表現,不斷調整參數;Caviar(魚子醬方式):擁有足夠的計算機去平行試驗很多模型,嘗試很多不同的超參數,選取效果最好的模型。2.
  • executeBatch()該如何運用?關於 executeBatch() 的詳解用法
    禁用自動執行使得應用程式能夠在發生錯誤及批處理中的某些命令不能執行時決定是否執行事務處理。因此,當進行批處理更新時,通常應該關閉自動執行。在JDBC 2.0 中,Statement 對象能夠記住可以一起提交執行的命令列表。創建語句時,與它關聯的命令列表為空。Statement.addBatch() 方法為調用語句的命令列表添加一個元素。
  • 使用Wordbatch對Python分布式AI後端進行基準測試
    直到最近,大部分此類大數據技術都基於Hadoop等Java框架,但軟體和硬體的變化帶來了新的解決方案類型,包括用於AI的三個主要Python分布式處理框架:PySpark,Dask和射線。 分布式批處理框架Apache Spark及其Python接口PySpark是最古老的框架,最初的GitHub版本可追溯到2010年10月4日.Spark將自己定位為主要的大數據技術之一,在企業界得到廣泛採用。
  • 詳解Spring框架的AOP機制
    ● 理解AOP的編程思想及原理● 掌握AOP的實現技術Spring框架的AOP機制可以讓開發者把業務流程中的通用功能抽取出來,單獨編寫功能代碼。在業務流程執行過程中,Spring框架會根據業務流程要求,自動把獨立編寫的功能代碼切入到流程的合適位置。