Spring Batch -扩展和并行处理

时间:2022-12-19 22:55:40

Spring Batch -扩展和并行处理

许多批处理问题可以通过单线程、单进程作业来解决, 因此,在考虑之前正确检查这是否满足您的需求总是一个好主意 关于更复杂的实现。衡量实际工作的表现,看看是否 最简单的实现首先满足您的需求。您可以读取和写入文件 不到一分钟就能达到几百兆字节,即使使用标准硬件也是如此。

当你准备好开始实现具有一些并行处理的作业时,Spring 批处理提供了一系列选项,本章将介绍这些选项,尽管有些 功能在别处介绍。在高级别上,有两种并行模式 加工:

  • 单进程、多线程
  • 多进程

这些也分为几类,如下所示:

  • 多线程步骤(单进程)
  • 并行步骤(单进程)
  • 步骤的远程分块(多进程)
  • 对步骤进行分区(单进程或多进程)

首先,我们回顾一下单进程选项。然后我们回顾多进程选项。

多线程步骤

开始并行处理的最简单方法是在步骤中添加 配置。​​TaskExecutor​

例如,您可以向 添加一个属性 ,如下所示:​​tasklet​

<step >
<tasklet task-executor="taskExecutor">...</tasklet>
</step>

使用 Java 配置时,可以在步骤中添加一个, 如以下示例所示:​​TaskExecutor​

爪哇配置

@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}

在此示例中,是对另一个 Bean 定义的引用 实现接口。TaskExecutor 是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用的详细信息 实现。最简单的多线程是 .​​taskExecutor​​​​TaskExecutor​​​​TaskExecutor​​​​SimpleAsyncTaskExecutor​

上述配置的结果是,通过读取、处理、 并将每个项目块(每个提交间隔)写入单独的执行线程中。 请注意,这意味着要处理的项目没有固定的顺序,并且块 可能包含与单线程大小写相比不连续的项目。在 除了任务执行程序设置的任何限制(例如它是否由 线程池),tasklet 配置具有限制(默认值:4)。 您可能需要增加此限制以确保线程池得到充分利用。​​Step​

例如,您可以增加限制,如下所示:

<step > <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>

使用 Java 配置时,构建器提供对限制的访问,如 遵循:

爪哇配置

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}

另请注意,在 中使用的任何池资源可能会对并发性施加限制 您的步骤,例如 .确保至少在这些资源中建立池 与步骤中所需的并发线程数一样大。​​DataSource​

使用 多线程实现存在一些实际限制 一些常见的批处理用例。许多参与者(如读者和作家) 是有状态的。如果状态未按线程隔离,则这些组件不是 可在多线程中使用。特别是大多数读者和 Spring Batch 中的编写器不是为多线程使用而设计的。然而, 可以使用无状态或线程安全的读取器和写入器,并且有一个示例 (称为)在春天 显示使用进程指示器(请参阅防止状态持久性)进行跟踪的批处理示例 已在数据库输入表中处理的项目。​​Step​​​​Step​​​​Step​​​​parallelJob​

Spring Batch 提供了 和 的一些实现。通常 他们在 Javadoc 中说它们是否线程安全,或者你必须做些什么来避免 并发环境中的问题。如果 Javadoc 中没有信息,您可以 检查实现以查看是否有任何状态。如果读取器不是线程安全的, 您可以使用提供的装饰它或自己使用它 同步委托人。可以将调用同步到 ,并且,只要 处理和写入是块中最昂贵的部分,你的步骤可能仍然 完成速度比单线程配置快得多。​​ItemWriter​​​​ItemReader​​​​SynchronizedItemStreamReader​​​​read()​

并行步骤

只要需要并行化的应用逻辑可以拆分为不同的 职责并分配给各个步骤,它可以在 单一进程。并行步骤执行易于配置和使用。

例如,并行执行步骤很简单, 如下:​​(step1,step2)​​​​step3​

<job >
<split task-executor="taskExecutor" next="step4">
<flow>
<step parent="s1" next="step2"/>
<step parent="s2"/>
</flow>
<flow>
<step parent="s3"/>
</flow>
</split>
<step parent="s4"/>
</job>

<beans:bean class="org.spr...SimpleAsyncTaskExecutor"/>

使用 Java 配置时,并行执行步骤非常简单,如下所示:​​(step1,step2)​​​​step3​

爪哇配置

@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}

@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}

@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}

@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}

@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}

可配置的任务执行器用于指定哪个实现应执行各个流。默认值为 ,但需要异步才能运行中的步骤 平行。请注意,作业可确保拆分中的每个流在之前完成 聚合退出状态和转换。​​TaskExecutor​​​​SyncTaskExecutor​​​​TaskExecutor​

有关更多详细信息,请参阅拆分流部分。

远程分块

在远程分块中,处理被拆分到多个进程中, 通过一些中间件相互通信。下图显示了 模式:​​Step​

Spring Batch -扩展和并行处理

图1.远程分块

管理器组件是单个进程,工作线程是多个远程进程。 如果管理器不是瓶颈,则此模式效果最佳,因此处理必须更多 比阅读项目昂贵(在实践中经常出现这种情况)。

管理器是弹簧批处理的实现,其中被替换的 通过知道如何将项目块发送到中间件的通用版本作为 消息。工作线程是正在使用的任何中间件的标准侦听器(对于 例如,对于 JMS,它们将是实现),它们的作用是 通过接口使用标准或加号来处理项目块。使用它的优点之一 模式是读取器、处理器和写入器组件是现成的(相同 将用于步骤的本地执行)。项目是动态划分的, 并且工作通过中间件共享,因此,如果听众都渴望 使用者,负载平衡是自动的。​​Step​​​​ItemWriter​​​​MesssageListener​​​​ItemWriter​​​​ItemProcessor​​​​ItemWriter​​​​ChunkProcessor​

中间件必须耐用,有保证交付,并且每个中间件都有一个消费者 消息。JMS是显而易见的候选者,但其他选项(如JavaSpaces)存在于 网格计算和共享内存产品空间。

有关更多详细信息,请参阅​​有关 Spring 批量集成 - 远程分块​​的部分。

分区

Spring Batch 还提供了用于对执行进行分区和执行的 SPI。 远程。在这种情况下,远程参与者是可以 易于配置并用于本地处理。下图显示了 模式:​​Step​​​​Step​

Spring Batch -扩展和并行处理

图2.分区

在左侧作为一系列实例运行,其中一个实例被标记为管理器。这张照片中的工人都是相同的 的实例,实际上可以代替管理器,导致 与 的结果相同。工作人员通常是远程服务,但 也可以是执行的本地线程。经理发送给工作人员的消息 在这种模式下不需要持久或有保证的交付。春季批次 中的元数据确保每个工作线程执行一次,并且只执行一次 每次执行。​​Job​​​​Step​​​​Step​​​​Step​​​​Job​​​​JobRepository​​​​Job​

Spring Batch 中的 SPI 由一个特殊的实现(称为 )和两个策略接口组成,这两个接口需要针对特定的 环境。策略接口是 和 , 下面的序列图显示了它们的作用:​​Step​​​​PartitionStep​​​​PartitionHandler​​​​StepExecutionSplitter​

Spring Batch -扩展和并行处理

图3.分区 SPI

在这种情况下,右边是“远程”工作者,因此,可能存在 许多对象和/或进程扮演着这个角色,并且显示驱动 执行。​​Step​​​​PartitionStep​

以下示例显示了使用 XML 时的配置 配置:​​PartitionStep​

<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>

以下示例显示了使用 Java 时的配置 配置:​​PartitionStep​

爪哇配置

@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}

与多线程步骤的属性类似,该属性可防止任务执行器被来自单个请求的请求饱和 步。​​throttle-limit​​​​grid-size​

与多线程步骤的方法类似,该方法可防止任务执行器被来自单个请求的请求饱和 步。​​throttleLimit​​​​gridSize​

春季单元测试套件 批处理示例(请参阅配置)有一个简单的示例,您可以复制和扩展。​​partition*Job.xml​

Spring 批处理为调用的分区创建步骤执行,因此 上。许多人更喜欢调用经理步骤以保持一致性。您可以 为步骤使用别名(通过指定属性而不是属性)。​​step1:partition0​​​​step1:manager​​​​name​​​​id​

分区处理程序

​PartitionHandler​​是知道远程处理结构的组件,或者 网格环境。它能够向远程实例发送请求,以某种特定于结构的格式(如 DTO)包装。它不必知道 如何拆分输入数据或如何聚合多次执行的结果。 一般来说,它可能也不需要了解复原能力或故障转移, 因为在许多情况下,这些都是织物的特征。无论如何,弹簧批次总是 提供独立于结构的可重启性。失败的始终可以重新启动, 在这种情况下,只会重新执行失败的。​​StepExecution​​​​Step​​​​Step​​​​Job​​​​Steps​

该接口可以具有针对各种 结构类型,包括简单的 RMI 远程处理、EJB 远程处理、定制 Web 服务、JMS、Java 空间、共享内存网格(如兵马俑或连贯)和网格执行结构 (如网格增益)。Spring Batch 不包含任何专有网格的实现 或远程织物。​​PartitionHandler​

但是,Spring Batch确实提供了有用的实现。 使用 Spring 中的策略在单独的执行线程中本地执行实例。该实现称为 。​​PartitionHandler​​​​Step​​​​TaskExecutor​​​​TaskExecutorPartitionHandler​

是使用 XML 配置的步骤的默认值 前面显示的命名空间。您还可以显式配置它,如下所示:​​TaskExecutorPartitionHandler​

<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>

您可以显式配置 与 Java 配置, 如下:​​TaskExecutorPartitionHandler​

爪哇配置

@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}

@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}

该属性确定要创建的单独步骤执行的次数,因此 它可以与 中的线程池大小匹配。或者,它 可以设置为大于可用线程数,这使得块 工作更小。​​gridSize​​​​TaskExecutor​

对于 IO 密集型实例非常有用,例如 将大量文件拷贝或将文件系统复制到内容管理中 系统。它还可以通过提供实现来用于远程执行 这是远程调用的代理(例如使用 Spring Remoting)。​​TaskExecutorPartitionHandler​​​​Step​​​​Step​

分区程序

具有更简单的职责:生成执行上下文作为输入 仅新步骤执行的参数(无需担心重新启动)。它有一个 单一方法,如以下接口定义所示:​​Partitioner​

public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个步骤执行的唯一名称 () 与 .名字显示 稍后在批处理元数据中作为分区中的步骤名称。这只是一袋名称-值对,因此可能包含一系列 主键、行号或输入文件的位置。遥控器然后 通常使用占位符绑定到上下文输入(步骤中的后期绑定 范围),如下一节所示。​​String​​​​ExecutionContext​​​​StepExecutions​​​​ExecutionContext​​​​Step​​​​#{…}​

步骤执行的名称(返回者 中的键)需要 在 A 的步骤执行中是唯一的,但没有任何其他特定 要求。执行此操作(并使名称对用户有意义)的最简单方法是 使用前缀+后缀命名约定,其中前缀是步骤 正在执行(它本身在 中是唯一的,后缀只是一个 计数器。在使用此约定的框架中有一个。​​Map​​​​Partitioner​​​​Job​​​​Job​​​​SimplePartitioner​

您可以使用调用的可选接口来提供分区 与分区本身分开的名称。如果实现此 界面,则在重新启动时仅查询名称。如果分区成本高昂, 这可能是一个有用的优化。必须提供的名称 匹配 提供的那些。​​PartitionNameProvider​​​​Partitioner​​​​PartitionNameProvider​​​​Partitioner​

将输入数据绑定到步骤

对于由 执行的步骤来说,这是非常有效的 相同的配置,并且它们的输入参数在运行时从 .这很容易通过Spring Batch的StepScope功能来实现。 (在延迟绑定部分中有更详细的介绍)。为 例如,如果使用属性键创建实例 调用 ,指向每个步骤调用的不同文件(或目录), 输出可能类似于下表的内容:​​PartitionHandler​​​​ExecutionContext​​​​Partitioner​​​​ExecutionContext​​​​fileName​​​​Partitioner​

表 1.目标目录处理提供的执行上下文的示例步骤执行名称​​Partitioner​

步骤执行名称(键)

执行上下文(值)

文件复制:分区0

文件名=/主页/数据/一

文件复制:分区 1

文件名=/主页/数据/二

文件复制:分区2

文件名=/主页/数据/三

然后,可以使用到执行上下文的后期绑定将文件名绑定到步骤。

下面的示例演示如何在 XML 中定义后期绑定:

XML 配置

<bean  scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>

以下示例演示如何在 Java 中定义后期绑定:

爪哇配置

@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}