Spring Batch 跑批框架

时间:2022-10-24 08:14:28
SpringBatch的框架包括启动批处理作业的组件和存储Job执行产生的元数据. 如果作为一个批处理应用程序的开发人员,你暂时没有必要跟这些组件打交道,
因为它们主要为我们提供组件支持的角色,但是您需要配置这个基础框架在你的批处理应用程序中.
 
启动Jobs并存储Job元数据.
SpringBatch框架本身还是比较复杂的,但是我们开始接触它的时候主要是处理两个组件:Job Launcher和Job Repository. 这两个概念对应着java接口分别是:
JobLauncher和JobRepository.
 
JobLauncher
对于Spring Batch这个框架里的小世界而言. JobLauncher是连接它跟外面世界的一个入口,具体SpringBatch如何跟外界进行交互,文章结尾会进行简单的说明.
相对而言JobLauncher接口还是比较简单的,如下:
Spring Batch 跑批框架Spring Batch 跑批框架
 
批处理是企业级业务系统不可或缺的一部分,spring batch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的
数据批量处理业务员.SpringBatch基于POJO和Spring框架,容易上手,让开发者很容易访问和利用企业级服务.
spring batch具有高可扩展性的框架,简单的批处理,复杂的大数据批处理作业都可以通过SpringBatch框架来实现.
 
 
一个轻量级的、全面的批处理框架,旨在支持对企业系统的日常操作至关重要的健壮批处理应用程序的开发。Spring Batch提供了可重用的功能,这些功能在处理大量记录时非常重要,包括日志/跟踪、事务管理、作业处理统计、作业重新启动、跳过和资源管理。它还提供了更高级的技术服务和特性,这些服务和特性将通过优化和分区技术支持极高容量和高性能的批处理作业。简单以及复杂、高容量的批处理作业可以以高度可伸缩的方式利用框架来处理大量信息。
Spring Batch 跑批框架Spring Batch 跑批框架
Spring Batch 跑批框架Spring Batch 跑批框架
技术目标:
  • 批处理开发人员使用Spring编程模型: 专注于业务逻辑,并让框架处理基础结构
  • 在基础结构,批处理执行环境和批处理应用程序之间明确分离关注点.
  • 提供通用的核心执行服务作为所有项目都可以实现的接口
  • 提供可以直接使用的核心执行接口的简单和默认实现.
  • 通过在所有层中利用spring框架,轻松配置,定制和扩展服务.
  • 所有现有的核心服务应易于替换或扩展,而不会影响基础架构层.
  • 提供一个简单的部署模型,其架构JAR与使用Maven构建的应用程序完全分开.
 

sf.gg
Spring Batch 结构
首先,Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情.
一个Job包含很多Step,step就是每个job要执行的单个步骤.
如下图,Step(步骤)里面,会有Tasklet(小任务,任务单元),Tasklet是一个任务单元,它是属于可以重复利用的东西.
然后是Chunk(数据块),chunk就是数据块,你需要定义多大的数据量是一个chunk.
Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据.Spring Batch会不断的循环这个流程,直到批处理数据完成.
Spring Batch 跑批框架
 
构建Spring Batch :
首先,我们需要一个全局的Configuration来配置所有的Job和一些全局配置.
@Configuration@EnableAutoConfiguration@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
    @Bean
    public ApplicationContextFactory firstJobContext() {
        return new GenericApplicationContextFactory(FirstJobConfiguration.class);
    }
    
    @Bean
    public ApplicationContextFactory secondJobContext() {
        return new GenericApplicationContextFactory(SecondJobConfiguration.class);
    }
 
 
}
两个bean的实例化,每个bean是一个任务配置
注解@EnableBatchProcessing(modular = true)
这个注解是打开Batch,如果要实现多Job的情况,需要把EnableBatchProcessing注解
modular设置为true,让每个Job使用自己的ApplicationContext.
比如上面代码就创建了两个Job.
Spring Batch 跑批框架Spring Batch 跑批框架
这里做一个实体类映射,加入@Entity, @Table注解
 
首先构建Job
首先我们需要一个关于这个Job的Configuration,它将在SpringBatchConfiguration里被加载.
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
    @Bean
    public ApplicationContextFactory messageMigrationJobContext() {
        return new GenericApplicationContextFactory(MessageMigrationJobConfiguration,class);
    }
}
下面的关于构建Job的代码都将写在这个MessageMigrationJobConfiguration里面
public class MessageMigrationJobConfiguration {
    //调数据 执行 写数据
}
我们先定义一个Job的Bean
@Autowired
private JobBuilderFactory jobBuilderFactory;
 
@Bean
public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) {
    return jobBuilderFactory.get("messageMigrationJob")
            .start(messageMigrationStep)
            .build();
}
jobBuilderFactory是注入进来的,get里面的就是job的名字.
这个job只有一个step.
 
Step
接下来就是创建Step
@Autowired
private StepBuilderFactory stepBuilderFactory;
 
@Bean
public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
                                 @Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
                                 @Qualifier("errorWriter") Writer errorWriter) {
    return stepBuilderFactory.get("messageMigrationStep")
            .<Message, Message>chunk(CHUNK_SIZE)
            .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageItemReadListener(errorWriter))
            .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageWriteListener())
            .build();
}
stepBuilderFactory是注入进来的,然后get里面是Step的名字.
我们的Step中可以构建很多东西,比如reader,processer,writer,listener等等.
下面我们就逐个来看看step里面的这些东西是如何使用的.
 
Chunk
Spring batch在配置Step时采用的是基于Chunk的机制,即每次读取一条数据,再处理一条数据,累积到
一定数量后再一次性交给writer进行写入操作. 这样可以最大化的优化写入效率,整个事务也是基于Chunk
来进行.
比如我们定义chunk size是50,那就意味着,spring batch处理了50条数据后,再统一向数据库写入.
这里有个很重要的点,chunk前面需要定义数据输入类型和输出类型,由于我们输入是Message,输出
也是Message,所以两个都直接写Message了.
如果不定义这个类型,会报错.
.<Message,Message> chunk(CHUNK_SIZE)
 
Reader (读取器)
Reader顾名思义就是从数据源读取数据.
Spring Batch给我们提供了很多好用实用的reader, 基本能满足我们所有需求.
比如FlatFileItemReader, JdbcCursorItemReader, JpaPagingItemReader 等.
也可以自己实现Reader.
本例子里面,数据源是文本文件,所以我们就使用FlatFileItemReader. FlatFileItemReader是从文件里面
一行一行的读取数据.
首先需要设置文件路径,也就是设置resource.
因为我们需要把一行文本映射为Message类,所以我们需要自己设置并实现LineMapper.
@Bean
public FlatFileItemReader<Message> jsonMessageReader() {
    FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
    reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
    reader.setLineMapper(new MessageLineMapper());
    return reader;
}
 
Line Mapper (行读取工具)
LineMapper的输入就是获取一行文本,和行号,然后转换成Message.
在本例子里面,一行文本就是一个json对象,所以我们使用JsonParser来转换成Message.
public class MessageLineMapper implements LineMapper<Message> {
    private MappingJsonFactory factory = new MappingJsonFactory();
    
    @Override
    public Message mapLine(String line,int lineNumber) throws Exception {
        JsonParser parser = factory.createParser(line);
        Map<String,Object> map = (Map)parser.readValueAs(Map.class);
        Message message = new Message();
        //转换逻辑
        return message;
    }
}
 
 
Processor (处理程序)
由于本例子里面,数据是一行文本,通过reader变成Message的类,然后writer直接把Message写入MySQL.
所以我们的例子里面就不需要Processor, 关于如何写Processor其实和reader/writer是一样的道理.
从它的接口可以看出,需要定义输入和输出的类型,把输入I通过某些逻辑处理之后,返回输出O.
public interface ItemProcessor<I,O> {
    O process(I item) throws Exception;
}
 
Writer (写入者)
Writer顾名思义就是把数据写入到目标数据源里面.
Spring Batch同样给我们提供很多好用实用的writer. 比如JpaItemWriter, FlatFileItemWriter, HibernateItemWriter, JdbcBatchItemWriter.
同样也可以自定义.
本例子里面,使用的是JpaItemWriter, 可以直接把Message对象写入到数据库里面, 但是需要设置一个EntityManagerFactory,可以注入进来.
@Autowired
private EntityManagerFactory entityManager;
 
@Bean
public JpaItemWriter<Message> messageItemWriter() {
    JpaItemWriter<Message> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManager);
    return writer;
}
另外,你需要配置数据库的连接等东西, 由于我使用的spring,所以直接在Application.properties里面配置如下:
spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
...
...
 
spring.datasource相关的设置都是在配置数据库的连接.
spring.batch.initialize-schema=always表示让spring batch在数据库里面创建默认的数据表.
 
 
Listener
Spring Batch同样实现了非常完善全面的listener, listener很好理解,就是用来监听每个步骤的结果.
比如可以有监听step的, job, reader,writer的.
本例博主关心的是read的时候有没有出错,和write的时候有没有出错,所以,我只实现了ReadListener和WriteListener
 
在read出错的时候,把错误结果写入一个单独的error列表文件中.
public class MessageItemReadListener implements ItemReadListener<Message> {
    private Writer errorWriter;
 
    public MessageItemReadListener(Writer errorWriter) {
        this.errorWriter = errorWriter;
    }
 
    @Override
    public void beforeRead(){
    
    }
 
    @Override
    public void afterRead(Message item){
    
    }
 
    @Override
    public void onReadError(Exception ex){
        errorWriter.write(format("%s%n",ex.getMessage()));
    }
 
}
 
在write出错的时候,也做同样的事情, 把出错的原因写入单独的日志中.
public class MessageWriteListener implements ItemWriteListener<Message> {
 
 
    @Autowired
    private Writer errorWriter;
 
 
    @Override
    public void beforeWrite(List<? extends Message> items) {
    }
 
 
    @Override
    public void afterWrite(List<? extends Message> items) {
    }
 
 
    @Override
    public void onWriteError(Exception exception, List<? extends Message> items) {
        errorWriter.write(format("%s%n", exception.getMessage()));
        for (Message message : items) {
            errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
        }
    }
}
 
前面有说chunk机制,所以write的listener传入参数是一个List,因为它是累积到一定的数量才一起写入.
 
 
Skip
Spring Batch提供了skip的机制,也就是说,如果出错了,可以跳过. 如果你不设置skip,那么一条数据出错了,整个job都会挂掉.
设置skip的时候一定要设置什么Exception才需要跳过,并且跳过多少条数据. 如果失败的数据超过你设置的skip limit,那么job就会失败.
你可以分别给reader和writer等设置skip机制.
writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
 
 
Retry
这个和Skip是一样的原理,就是失败之后可以重试,你同样需要设置重试的次数.
同样可以分别给reader,writer等设置retry机制.
如果同时设置了retry和skip,会先重试所有次数,然后再开始skip, 比如retry是10次,skip是20,会先重试10次之后,再开始算第一次skip.
 
 
运行Job
这里博主说的Job运行方法是通过main方法,获取到args的参数从jobRegistry中获取该job, 通过jobLauncher.run运行该job
public static void main(String[] args) {
    String jobName = args[0];
 
 
    try {
        ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
        JobRegistry jobRegistry = context.getBean(JobRegistry.class);
        Job job = jobRegistry.getJob(jobName);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        JobExecution jobExecution = jobLauncher.run(job, createJobParams());
        if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            throw new RuntimeException(format("%s Job execution failed.", jobName));
        }
    } catch (Exception e) {
        throw new RuntimeException(format("%s Job execution failed.", jobName));
    }
}
 
 
private static JobParameters createJobParams() {
    return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
 
 

 
public static void main(String[] args) {
    String jobName = args[0];
 
    try {
        ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
        JobRegistry jobRegistry = context.getBean(JobRegistry.class);
        Job job = jobRegistry.getJob(jobName);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        JobExecution jobExecution = jobLauncher.run(job, createJobParams());
        if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            throw new RuntimeException(format("%s Job execution failed.", jobName));
        }
    } catch (Exception e) {
        throw new RuntimeException(format("%s Job execution failed.", jobName));
    }
}
 
private static JobParameters createJobParams() {
    return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
 

 
最后,把jar包编译出来,在命令行执行下面的命令,就可以运行你的Spring Batch了
java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
 
从这可以看出批处理的执行可以分为一个个微服务 (springboot) 去单独运行.
 

 
info调试
调试主要依靠控制台输出的log,可以在application.properties里面设置log输出的级别,比如你希望输出INFO信息还是DEBUG信息。
基本上,通过查看log都能定位到问题。
logging.path=build/logs
logging.file=${logging.path}/batch.log
logging.level.com.easystudio=INFO
logging.level.root=INFO
log4j.logger.org.springframework.jdbc=INFO
log4j.logger.org.springframework.batch=INFO
logging.level.org.hibernate.SQL=INFO
 
Spring Batch数据表
如果你的batch最终会写入数据库,那么Spring Batch会默认在你的数据库里面创建一些batch相关的表.
来记录所有job/step运行的状态和结果.
 
如果你的batch最终会写入数据库,那么Spring Batch会默认在你的数据库里面创建一些batch相关的表,
来记录所有job/step运行的状态和结果.
 
大部分表你都不需要关心,你只需要关心几张表.
Spring Batch 跑批框架Spring Batch 跑批框架
 
batch_job_instance : 这张表能看到每次运行的job名字:
Spring Batch 跑批框架Spring Batch 跑批框架
batch_job_execution : 这张表能看到每次运行job的开始时间,结束时间,状态,以及失败后的错误消息是什么.
Spring Batch 跑批框架Spring Batch 跑批框架
 
batch_step_execution : 这张表你能看到更多关于step的详细信息,比如step的开始时间,结束时间,提交次数,读写次数,状态,以及失败后的错误信息等.
Spring Batch 跑批框架Spring Batch 跑批框架
总结
Spring Batch为我们提供了非常实用的功能,对批处理场景进行了完善的抽象,它不仅能实现小数据的迁移,也能应对大企业的大数据实践应用.
它让我们开发批处理应用可以事半功倍.
 
tips: 搭建Spring Batch的过程中,会遇到各种各样的问题, 只要善用Google,都能找到答案.