创建基本批处理驱动解决方案

时间:2022-12-22 11:24:45

创建基本批处理驱动解决方案

本指南将引导您完成创建基本批处理驱动解决方案的过程。

您将构建什么

您将构建一个服务,该服务从 CSV 电子表格导入数据,使用自定义代码对其进行转换,并将最终结果存储在数据库中。

你需要什么

  • 约15分钟
  • 最喜欢的文本编辑器或 IDE
  • JDK 1.8或以后
  • 格拉德尔 4+​或梅文 3.2+
  • 您也可以将代码直接导入到 IDE 中:
  • 弹簧工具套件 (STS)
  • 智能理念
  • VSCode

如何完成本指南

像大多数春天一样入门指南,您可以从头开始并完成每个步骤,也可以绕过您已经熟悉的基本设置步骤。无论哪种方式,您最终都会得到工作代码。

要从头开始,请继续从 Spring 初始化开始.

要跳过基础知识,请执行以下操作:

  • 下载​并解压缩本指南的源存储库,或使用吉特:git clone https://github.com/spring-guides/gs-batch-processing.git
  • 光盘成gs-batch-processing/initial
  • 跳转到创建商务舱.

完成后,您可以根据 中的代码检查结果。​​gs-batch-processing/complete​

业务数据

通常,您的客户或业务分析师会提供电子表格。对于这个简单的示例,您可以在以下位置找到一些虚构的数据:​​src/main/resources/sample-data.csv​

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

此电子表格在每行上包含一个名字和一个姓氏,用逗号分隔。这是一种相当常见的模式,Spring 无需自定义即可处理。

接下来,您需要编写一个 SQL 脚本来创建用于存储数据的表。您可以在以下位置找到这样的脚本:​​src/main/resources/schema-all.sql​

DROP TABLE people IF EXISTS;

CREATE TABLE people (
person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);

Spring Boot 在启动期间自动运行。 是所有平台的默认值。​​schema-@@platform@@.sql​​​​-all​

从 Spring 初始化开始

你可以使用这个预初始化项目,然后单击生成以下载 ZIP 文件。此项目配置为适合本教程中的示例。

手动初始化项目:

  1. 导航到https://start.spring.io.此服务拉入应用程序所需的所有依赖项,并为您完成大部分设置。
  2. 选择 Gradle 或 Maven 以及您要使用的语言。本指南假定您选择了 Java。
  3. 单击“依赖项”,然后选择“Spring Batch 和 HyperSQL 数据库”。
  4. 单击生成
  5. 下载生成的 ZIP 文件,该文件是配置了您选择的 Web 应用程序的存档。

如果您的 IDE 集成了 Spring Initializr,则可以从 IDE 完成此过程。

您也可以从 Github 分叉项目,然后在 IDE 或其他编辑器中打开它。

创建商务舱

现在,您可以看到数据输入和输出的格式,您可以编写代码来表示一行数据,如以下示例 (from ) 所示:​​src/main/java/com/example/batchprocessing/Person.java​

package com.example.batchprocessing;

public class Person {

private String lastName;
private String firstName;

public Person() {
}

public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getFirstName() {
return firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

@Override
public String toString() {
return "firstName: " + firstName + ", lastName: " + lastName;
}

}

可以通过构造函数或通过设置属性来实例化具有名字和姓氏的类。​​Person​

创建中间处理器

批处理中的常见范例是引入数据,转换数据,然后将其传输到其他位置。在这里,您需要编写一个简单的转换器,将名称转换为大写。以下清单(来自 )显示了如何执行此操作:​​src/main/java/com/example/batchprocessing/PersonItemProcessor.java​

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();

final Person transformedPerson = new Person(firstName, lastName);

log.info("Converting (" + person + ") into (" + transformedPerson + ")");

return transformedPerson;
}

}

​PersonItemProcessor​​实现 Spring Batch 的接口。这样可以轻松地将代码连接到将在本指南后面定义的批处理作业中。根据接口,您会收到一个传入对象,然后将其转换为大写 .​​ItemProcessor​​​​Person​​​​Person​

输入和输出类型不必相同。实际上,读取一个数据源后,有时应用程序的数据流需要不同的数据类型。

将批处理作业放在一起

现在,您需要将实际的批处理作业放在一起。Spring Batch 提供了许多实用程序类,可以减少编写自定义代码的需要。相反,您可以专注于业务逻辑。

要配置作业,必须首先创建一个 Spring 类,如以下示例所示。此示例使用基于内存的数据库,这意味着,完成后,数据将消失。现在将以下 bean 添加到您的类中以定义读取器、处理器和编写器:​​@Configuration​​​​src/main/java/com/exampe/batchprocessing/BatchConfiguration.java​​​​BatchConfiguration​

@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}

@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.build();
}

第一个代码块定义输入、处理器和输出。

  • ​reader()​​创建一个 .它查找一个调用的文件,并使用足够的信息解析每个行项,以将其转换为 .ItemReadersample-data.csvPerson
  • ​processor()​​创建您之前定义的 的实例,用于将数据转换为大写。PersonItemProcessor
  • ​writer(DataSource)​​创建一个 .这个针对的是 JDBC 目标,并自动获取由 创建的数据源的副本。它包括插入单个 、由 Java Bean 属性驱动的 SQL 语句。ItemWriter@EnableBatchProcessingPerson

最后一个块(来自)显示了实际的作业配置:​​src/main/java/com/example/batchprocessing/BatchConfiguration.java​

@Bean
public Job importUserJob(JobRepository jobRepository,
JobCompletionNotificationListener listener, Step step1) {
return new JobBuilder("importUserJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}

@Bean
public Step step1(JobRepository jobRepository,
PlatformTransactionManager transactionManager, JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository)
.<Person, Person> chunk(10, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}

第一种方法定义作业,第二种方法定义单个步骤。作业是从步骤构建的,其中每个步骤都可能涉及读取器、处理器和编写器。

在此作业定义中,您需要一个增量器,因为作业使用数据库来维护执行状态。然后列出每个步骤(尽管此作业只有一个步骤)。作业结束,Java API 生成完美配置的作业。

在步骤定义中,您可以定义一次要写入的数据量。在这种情况下,它一次最多写入十条记录。接下来,使用前面注入的 Bean 配置读取器、处理器和写入器。

​chunk()​​​带有前缀,因为它是泛型方法。这表示每个处理“块”的输入和输出类型,并与 和 对齐。​​<Person,Person>​​​​ItemReader<Person>​​​​ItemWriter<Person>​

批处理配置的最后一位是在作业完成时获得通知的一种方式。下面的示例 (from ) 显示了这样的类:​​src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java​

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

private final JdbcTemplate jdbcTemplate;

@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}

@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");

jdbcTemplate.query("SELECT first_name, last_name FROM people",
(rs, row) -> new Person(
rs.getString(1),
rs.getString(2))
).forEach(person -> log.info("Found <" + person + "> in the database."));
}
}
}

侦听作业何时存在,然后用于检查结果。​​JobCompletionNotificationListener​​​​BatchStatus.COMPLETED​​​​JdbcTemplate​

使应用程序可执行

尽管批处理可以嵌入到 Web 应用程序和 WAR 文件中,但下面演示的更简单方法会创建一个独立的应用程序。您将所有内容打包到一个可执行的 JAR 文件中,该文件由一个很好的旧 Java 方法驱动。​​main()​

Spring 初始化器为您创建了一个应用程序类。对于这个简单的示例,它无需进一步修改即可工作。以下清单(来自 )显示了应用程序类:​​src/main/java/com/example/batchprocessing/BatchProcessingApplication.java​

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

public static void main(String[] args) throws Exception {
System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
}
}

​@SpringBootApplication​​是一个方便的注释,它添加了以下所有内容:

  • ​@Configuration​​:将类标记为应用程序上下文的 Bean 定义源。
  • ​@EnableAutoConfiguration​​:告诉 Spring 引导根据类路径设置、其他 bean 和各种属性设置开始添加 bean。例如,如果 在类路径上,则此注释会将应用程序标记为 Web 应用程序并激活关键行为,例如设置 .spring-webmvcDispatcherServlet
  • ​@ComponentScan​​:告诉 Spring 在包中查找其他组件、配置和服务,让它找到控制器。com/example

该方法使用 Spring Boot 的方法启动应用程序。您是否注意到没有一行 XML?也没有文件。此 Web 应用程序是 100% 纯 Java,您无需处理配置任何管道或基础结构。​​main()​​​​SpringApplication.run()​​​​web.xml​

请注意这一点,并确保 JVM 在作业完成后退出。请参阅的​​SpringApplication.exit()​​​​System.exit()​​Spring 引导参考文档中的应用程序退出部分了解更多详情。

出于演示目的,有用于创建 、 查询数据库并打印出批处理作业插入的人员姓名的代码。​​JdbcTemplate​

构建可执行的 JAR

您可以使用 Gradle 或 Maven 从命令行运行应用程序。您还可以构建一个包含所有必需依赖项、类和资源的可执行 JAR 文件并运行该文件。通过构建可执行 jar,可以轻松地在整个开发生命周期中跨不同环境等将服务作为应用程序进行交付、版本控制和部署。

如果使用 Gradle,则可以使用 .或者,您可以使用 JAR 文件生成 JAR 文件,然后运行该文件,如下所示:​​./gradlew bootRun​​​​./gradlew build​

java -jar build/libs/gs-batch-processing-0.1.0.jar

如果使用 Maven,则可以使用 运行应用程序。或者,您可以使用 JAR 文件生成 JAR 文件,然后运行该文件,如下所示:​​./mvnw spring-boot:run​​​​./mvnw clean package​

java -jar target/gs-batch-processing-0.1.0.jar

此处描述的步骤将创建一个可运行的 JAR。你也可以构建经典 WAR 文件.

作业为每个被转换的人打印出一行。作业运行后,还可以查看查询数据库的输出。它应类似于以下输出:

Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
Found <firstName: JILL, lastName: DOE> in the database.
Found <firstName: JOE, lastName: DOE> in the database.
Found <firstName: JUSTIN, lastName: DOE> in the database.
Found <firstName: JANE, lastName: DOE> in the database.
Found <firstName: JOHN, lastName: DOE> in the database.

总结

祝贺!您构建了一个批处理作业,该作业从电子表格中引入数据,对其进行处理,然后将其写入数据库。