Springboot集成Spring Batch

时间:2023-03-09 16:01:01
Springboot集成Spring Batch

Spring官网 (https://spring.io/projects/spring-batch#overview)对Spring  Batch的解释:


     一个轻量级的、全面的批处理框架,用于开发对企业系统的日常操作至关重要的健壮批处理应用程序。

Spring Batch提供了处理大量记录所必需的可重用功能,包括日志/跟踪、事务管理、作业处理统计信息、

作业重启、跳过和资源管理。它还提供了更高级的技术服务和特性,这些特性将通过优化和分区技术支持极高

容量和高性能的批处理作业。无论简单还是复杂,大容量批处理作业都可以以高度可伸缩的方式利用框架来处

理大量信息。

Features(特性):

  • Transaction management   事务管理
  • Chunk based processing    基于块的处理
  • Declarative I/O声明式I / O
  • Start/Stop/Restart                 启动/停止/启动
  • Retry/Skip                              重试/跳过
  • Web based administration interface (Spring Cloud Data Flow)       基于Web的管理接口(Spring Cloud Data Flow)

    Spring batch主要有以下部分组成:

  • JobRepository       用来注册job的容器
  • JobLauncher              用来启动Job的接口
  • Job                               实际执行的任务,包含一个或多个Step
  • Step                             step包含ItemReader、ItemProcessor和ItemWriter
  • ItemReader                 用来读取数据的接口
  • ItemProcessor            用来处理数据的接口
  • ItemWriter                   用来输出数据的接口

Springboot集成Spring Batch

一个Job有一个或多个Step组成,Step有读、处理、写三部分操作组成;通过JobLauncher启动Job,启动时从

JobRepository获取Job Execution;

当前运行的Job及Step的结果及状态保存在JobRepository中。

下面是Springboot集成Spring Batch 的 一个Demo:

1.在idea下新建一个springboot项目

    pom.xml

    

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId>
<artifactId>springbatch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging> <name>springbatch</name>
<description>Demo project for Spring Boot</description> <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency> <dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency> <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build> </project>

ItemReader

public class Reader implements ItemReader<String> {

    private String[] messages = { "篮球之神 Michael Jordan",
"欢迎来到batch示例 Welcome to Spring Batch Example",
"The more time goes by,The more life happens" }; private int count = 0; @Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException{
if(count<messages.length){
return messages[count++];
}else{
count=0;
}
return null;
}

ItemProcessor

public class Processor implements ItemProcessor<String,String> {
@Override
public String process(String data) throws Exception{
return data.toUpperCase();
}
}

ItemWriter

public class Writer implements ItemWriter<String> {

    @Override
public void write(List<? extends String> messages) throws Exception{
for(String msg:messages){
System.out.println("输出信息:"+ msg);
} }
}

Listener

public class JobCompletionListener extends JobExecutionListenerSupport {

    @Override
public void beforeJob(JobExecution jobExecution){
if(jobExecution.getStatus()== BatchStatus.STARTED){
System.out.println("批处理执行开始....");
}
}
@Override
public void afterJob(JobExecution jobExecution){
if(jobExecution.getStatus()== BatchStatus.COMPLETED){
System.out.println("批处理执行结束....");
} }
}

Config

@Configuration
public class BatchConfig { @Autowired
public JobBuilderFactory jobBuilderFactory; @Autowired
public StepBuilderFactory stepBuilderFactory; @Bean
public Job processJob(){
return jobBuilderFactory.get("processJob")
.incrementer(new RunIdIncrementer()).listener(listener())
.flow(orderStep1()).end().build();
} @Bean
public Step orderStep1() {
return stepBuilderFactory.get("orderStep1").<String, String> chunk(1)
.reader(new Reader()).processor(new Processor())
.writer(new Writer()).build();
} @Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
}

Controller

@RestController
@EnableScheduling
public class JobInvokerController { @Autowired
JobLauncher jobLauncher; @Autowired
Job processJob; //每20秒执行一次
@Scheduled(cron = "*/20 * * * * * ")
public void handle() throws Exception { JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(processJob, jobParameters); }
}

启动项目:每20秒执行一次批处理

Springboot集成Spring Batch

代码地址:https://github.com/liuchunbo24/Springboot-SpringBatch-Demo