(點擊上方公眾號,可快速關注)
學習問題歡迎留言或對話框諮詢
▼
你好,歡迎來到Java基礎知識分解站,今天小編為猿猿們整理了Spring批處理框架的內容,掌握這些知識,可以幫你省去一些造輪子的過程,提高開發效率。本文由博主姚兆峰分享,小編整理後推送,希望對你的工作有幫助。
在大型的企業應用中,或多或少都會存在大量的任務需要處理,如郵件批量通知所有將要過期的會員等等。而在批量處理任務的過程中,又需要注意很多細節,如任務異常、性能瓶頸等等。那麼,使用一款優秀的框架總比我們自己重複地造輪子要好得多一些。
我所在的物聯網雲平臺部門就有這麼一個需求,需要實現批量下發命令給百萬設備。為了防止枯燥乏味,下面就讓我們先通過Spring Batch框架簡單地實現一下這個功能,再來詳細地介紹這款框架!
引入依賴
首先我們需要引入對Spring Batch的依賴,在pom.xml文件加入下面的代碼:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>3.0.8.RELEASE</version>
</dependency>
裝載Bean
其次,我們需要在resources目錄下,創建applicationContext.xml文件,用於自動注入我們需要的類:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 事務管理器 -->
<bean id="transactionManager"/>
<!-- 任務倉庫 -->
<bean id="jobRepository">
<property name="transactionManager" ref="transactionManager"/>
</bean>
<!-- 任務加載器 -->
<bean id="jobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
</beans>
有了上面聲明的transactionManager、jobRepository、jobLauncher,我們就可以執行批量任務啦!不過,我們還需要創建一個任務。在Spring Batch框架中,一個任務Job由一個或者多個步驟Step,而步驟又由讀操作Reader、處理操作Processor、寫操作Writer組成,下面我們分別創建它們。
創建Reader
既然是讀操作,那麼肯定要有能讀的數據源,方便起見,我們直接在resources目錄下創建一個batch-data.csv文件,內容如下:
1,PENDING
2,PENDING
3,PENDING
4,PENDING
5,PENDING
6,PENDING
7,PENDING
8,PENDING
9,PENDING
10,PENDING
非常簡單,其中第一列代表著命令的id,第二列代表著命令的當前狀態。也就是說,現在有10條緩存的命令,需要下發給設備。
讀操作需要實現ItemReader<T>接口,框架提供了一個現成的實現類FlatFileItemReader。使用該類需要設置Resource和LineMapper。Resource代表著數據源,即我們的batch-data.csv文件;LineMapper則表示如何將文件的每行數據轉成對應的DTO對象。
創建DTO對象
由於我們的數據源是命令數據,所以我們需要創建一個DeviceCommand.java文件,代碼如下:
public class DeviceCommand {
private String id;
private String status;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
自定義LineMapper
我們需要自己實現一個LineMapper實現類,用於將batch-data.csv文件的每行數據,轉成程序方便處理的DeviceCommand對象。
public class HelloLineMapper implements LineMapper<DeviceCommand> {
@Override
public DeviceCommand mapLine(String line, int lineNumber) throws Exception {
// 逗號分割每一行數據
String[] args = line.split(",");
// 創建DeviceCommand對象
DeviceCommand deviceCommand = new DeviceCommand();
// 設置id值到對象中
deviceCommand.setId(args[0]);
// 設置status值到對象中
deviceCommand.setStatus(args[1]);
// 返回對象
return deviceCommand;
}
}
創建Processor
讀完數據後,我們就需要處理數據了。既然我們前面從文件裡讀取了待下發的命令,那麼在這裡下發命令給設備是最好的時機。處理操作需要實現ItemProcessor<I, O>接口,我們自己實現一個HelloItemProcessor.java即可,代碼如下:
public class HelloItemProcessor implements ItemProcessor<DeviceCommand, DeviceCommand> {
@Override
public DeviceCommand process(DeviceCommand deviceCommand) throws Exception {
// 模擬下發命令給設備
System.out.println("send command to device, id=" + deviceCommand.getId());
// 更新命令狀態
deviceCommand.setStatus("SENT");
// 返回命令對象
return deviceCommand;
}
}
創建Writer
處理完數據後,我們需要更新命令狀態到文件裡,用於記錄我們已經下發。與讀文件類似,我們需要實現ItemWriter<T>接口,框架也提供了一個現成的實現類FlatFileItemWriter。使用該類需要設置Resource和LineAggregator。Resource代表著數據源,即我們的batch-data.csv文件;LineAggregator則表示如何將DTO對象轉成字符串保存到文件的每行。
自定義LineAggregator
我們需要自己實現一個LineAggregator實現類,用於將DeviceCommand對象轉成字符串,保存到batch-data.csv文件。
public class HelloLineAggregator implements LineAggregator<DeviceCommand> {
@Override
public String aggregate(DeviceCommand deviceCommand) {
StringBuffer sb = new StringBuffer();
sb.append(deviceCommand.getId());
sb.append(",");
sb.append(deviceCommand.getStatus());
return sb.toString();
}
}
主程序
那麼,完事具備,只欠東風!接下面我們在主程序Main.java裡實現我們的批量命令下發功能!代碼如下:
public class Main {
public static void main(String[] args) throws Exception {
// 加載上下文
String[] configLocations = {"applicationContext.xml"};
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(configLocations);
// 獲取任務啟動器
JobLauncher jobLauncher = applicationContext.getBean(JobLauncher.class);
JobRepository jobRepository = applicationContext.getBean(JobRepository.class);
PlatformTransactionManager transactionManager = applicationContext.getBean(PlatformTransactionManager.class);
// 創建reader
FlatFileItemReader<DeviceCommand> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(new FileSystemResource("src/main/resources/batch-data.csv"));
flatFileItemReader.setLineMapper(new HelloLineMapper());
// 創建processor
HelloItemProcessor helloItemProcessor = new HelloItemProcessor();
// 創建writer
FlatFileItemWriter<DeviceCommand> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setResource(new FileSystemResource("src/main/resources/batch-data.csv"));
flatFileItemWriter.setLineAggregator(new HelloLineAggregator());
// 創建Step
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
Step step = stepBuilderFactory.get("step")
.<DeviceCommand, DeviceCommand>chunk(1)
.reader(flatFileItemReader) // 讀操作
.processor(helloItemProcessor) // 處理操作
.writer(flatFileItemWriter) // 寫操作
.build();
// 創建Job
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
Job job = jobBuilderFactory.get("job")
.start(step)
.build();
// 啟動任務
jobLauncher.run(job, new JobParameters());
}
}
執行main方法之後,屏幕將會輸出下面信息:
send command to device, id=1
send command to device, id=2
send command to device, id=3
send command to device, id=4
send command to device, id=5
send command to device, id=6
send command to device, id=7
send command to device, id=8
send command to device, id=9
send command to device, id=10
再查看batch-data.csv文件,將會發現命令狀態全部更新為SENT:
1,SENT
2,SENT
3,SENT
4,SENT
5,SENT
6,SENT
7,SENT
8,SENT
9,SENT
10,SENT
至此,我們的批量命令下發全部成功!可以發現,使用Spring Batch框架來實現批處理非常的輕量,當然這只是它所有功能裡的冰山一角。
Spring Batch在官網是這樣一句話介紹自己的:
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款輕量的、全面的批處理框架,用於開發強大的日常運營的企業級批處理應用程式。)
框架主要有以下功能:
Transaction management(事務管理)
Chunk based processing(基於塊的處理)
Declarative I/O(聲明式的輸入輸出)
Start/Stop/Restart(啟動/停止/再啟動)
Retry/Skip(重試/跳過)
如果你的批處理程序需要使用上面的功能,那就大膽地使用它吧!
框架全貌
框架一共有4個主要角色:JobLauncher是任務啟動器,通過它來啟動任務,可以看做是程序的入口。Job代表著一個具體的任務。Step代表著一個具體的步驟,一個Job可以包含多個Step(想像把大象放進冰箱這個任務需要多少個步驟你就明白了)。JobRepository是存儲數據的地方,可以看做是一個資料庫的接口,在任務執行的時候需要通過它來記錄任務狀態等等信息。
JobLauncher
JobLauncher是任務啟動器,該接口只有一個run方法:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
除了傳入Job對象之外,還需要傳入JobParameters對象,後續講到Job再解釋為什麼要多傳一個JobParameters。通過JobLauncher可以在Java程序中調用批處理任務,也可以通過命令行或者其他框架(如定時調度框架Quartz、Web後臺框架Spring MVC)中調用批處理任務。Spring Batch框架提供了一個JobLauncher的實現類SimpleJobLauncher。
Job
Job代表著一個任務,一個Job與一個或者多個JobInstance相關聯,而一個JobInstance又與一個或者多個JobExecution相關聯:
考慮到任務可能不是只執行一次就再也不執行了,更多的情況可能是定時任務,如每天執行一次,每個星期執行一次等等,那麼為了區分每次執行的任務,框架使用了JobInstance。如上圖所示,Job是一個EndOfDay(每天最後時刻執行的任務),那麼其中一個JobInstance就代表著2007年5月5日那天執行的任務實例。框架通過在執行JobLauncher.run(Job, JobParameters)方法時傳入的JobParameters來區分是哪一天的任務。
由於2007年5月5日那天執行的任務可能不會一次就執行完成,比如中途被停止,或者出現異常導致中斷,需要多執行幾次才能完成,所以框架使用了JobExecution來表示每次執行的任務。
Step
一個Job任務可以分為幾個Step步驟,與JobExection相同,每次執行Step的時候使用StepExecution來表示執行的步驟。每一個Step還包含著一個ItemReader、ItemProcessor、ItemWriter,下面分別介紹這三者。
ItemReader
ItemReader代表著讀操作,其接口如下:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
框架已經提供了多種ItemReader接口的實現類,包括對文本文件、XML文件、資料庫、JMS消息等讀的處理,當然我們也可以自己實現該接口。
ItemProcessor
ItemReader代表著處理操作,其接口如下:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
process方法的形參傳入I類型的對象,通過處理後返回O型的對象。開發者可以實現自己的業務代碼來對數據進行處理。
ItemWriter
ItemReader代表著寫操作,其接口如下:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
框架已經提供了多種ItemWriter接口的實現類,包括對文本文件、XML文件、資料庫、JMS消息等寫的處理,當然我們也可以自己實現該接口。
JobRepository
JobRepository用於存儲任務執行的狀態信息,比如什麼時間點執行了什麼任務、任務執行結果如何等等。框架提供了2種實現,一種是通過Map形式保存在內存中,當Java程序重啟後任務信息也就丟失了,並且在分布式下無法獲取其他節點的任務執行情況;另一種是保存在資料庫中,並且將數據保存在下面6張表裡:
BATCH_JOB_INSTANCE
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_EXECUTION
BATCH_STEP_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_STEP_EXECUTION_CONTEXT
Spring Batch框架的JobRepository支持主流的資料庫:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。可愛的是,我司的Gauss資料庫也是支持的,只不過需要稍加配置。
本文先通過一個批量下發命令的Demo教大家如何快速入門,再對框架進行一個從整體到部分的介紹,讓大家有一個基礎的認識。由於篇幅和能力有限,關於Spring Batch框架的內部實現細節和高級特性無法面面俱到,猿猿們如果需要視頻教程可以通過本公眾號對話框諮詢了解。
小編感悟:牛頓曾經說過,如果我看得比別人更遠些,那是因為我站在巨人的肩膀上。是啊,借鑑優秀的開源框架,取其精華去其糟粕,比起閉門造重複的輪子,是會看得更遠,更加成功一些!
恭喜你今天又收穫了新知識
遇到學習問題?歡迎留言或微信對話框諮詢!
喜訊 :
動力節點(2017Java全集教程)即將完成錄製,需要教程的同學現在可通過公眾號搶先預定 免費領取。
11月就業快報| 2名學員入職京東 1名入職美團..
程序猿,如何成為月入100萬的自由職業者?
11月程式語言排行榜,Java第一, iOS沒人要了?
當程序猿寫不出代碼了,怎麼辦?
從一個普通程序猿到CTO,經歷幾個階段
一份來自阿里面試經驗
Java多線程面試題及答案
Java註解是怎樣成功上位的
零基礎入門Vip課程免費
史上最強Java學習路線
70個Java小白必背英語單詞
大型高級SVN教程精講發布
阿里巴巴Java開發者手冊