加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

Springbatch教程之扩展与并行处理

(2019-02-25 11:25:57)
分类: java笔记

https://blog.csdn.net/github_36849773/article/details/69226174


很多批处理问题都可以通过单进程、单线程的工作模式来完成, 所以在想要做一个复杂设计和实现之前,请审查你是否真的需要那些超级复杂的实现。 衡量实际作业(job)的性能,看看最简单的实现是否能满足需求: 即便是最普通的硬件,也可以在一分钟内读写上百MB数据文件。



当你准备使用并行处理技术来实现批处理作业时,Spring Batch提供一系列选择,本章将对他们进行讲述,虽然某些功能不在本章中涵盖。从高层次的抽象角度看,并行处理有两种模式: 单进程,多线程模式; 或者多进程模式。还可以将他分成下面这些种类:


·         多线程Step(单个进程)

·         并行Steps(单个进程)

·         远程分块Step(多个进程)

·         Step分区 (/多个进程)



下面我们先回顾一下单进程方式,然后再看多进程方式.


1.1 多线程 Step


启动并行处理最简单的方式就是在 Step 配置中加上一个TaskExecutor , 比如,作为 tasklet 的一个属性:


 

1.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="loading">

2.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">tasklet task-executor="taskExecutor">...</<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">tasklet>

3.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step>



上面的示例中, taskExecutor指向了另一个实现 TaskExecutor 接口的Bean. TaskExecutor 是一个标准的Spring接口,具体有哪些可用的实现类,请参考 Spring用户指南. 最简单的多线程TaskExecutor 实现是 SimpleAsyncTaskExecutor.


以上配置的结果就是在 Step(每次提交的块)记录的读取,处理,写入时都会在单独的线程中执行。请注意,这段话的意思就是在要处理的数据项之间没有了固定的顺序, 并且一个非连续块可能包含项目相比,单线程的例子。此外executor还有一些限制(例如,如果它是由一个线程池在后台执行的),有一个tasklet的配置项可以调整,throttle-limit默认为4。你可能根据需要增加这个值以确保线程池被充分利用,: 

1.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="loading">

2.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">tasklet task-executor="taskExecutor"

3.  throttle-limit="20">...</<span style="overflow-wrap: break-word; box-sizing: border-box;outline: 0px">tasklet>

4.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step>

 

 

还需要注意在step中并发使用连接池资源时可能会有一些限制,例如数据库连接池 DataSource. 请确保连接池中的资源数量大于或等于并发线程的数量.


在一些常见的批处理情景中,对使用多线程Step有一些实际的限制。Step中的许多部分(readers writers)是有状态的,如果某些状态没有进行线程隔离,那么这些组件在多线程Step中就是不可用的。特别是大多数Spring Batch提供的readers writers不是为多线程而设计的。但是,我们也可以使用无状态或线程安全的readers writers,可以参考Spring Batch Samples(parallelJob)的这个示例(点击进入Section 6.12, “Preventing State Persistence”),示例中展示了通过指示器来跟踪数据库input表中的哪些项目已经被处理过,而哪些还没有被处理。


Spring Batch 提供了ItemWriter ItemReader 的一些实现. 通常在javadoc中会指明是否是线程安全的,或者指出在并发环境中需要注意哪些问题. 假若文档中没有明确说明,你只能通过查看源代码来看看是否有什么线程不安全的共享状态. 一个并非线程安全的 reader , 也可以在你自己处理了同步的代理对象中高效地使用.


如果你的step中写操作和处理操作所消耗的时间更多,那么即使你对read()操作加锁进行同步,也会比你在单线程环境中执行要快很多.


1.2 并行 Steps


只要需要并行的程序逻辑可以划分为不同的职责,并分配给各个独立的step,那么就可以在单个进程中并行执行。并行Step执行很容易配置和使用,例如,将执行步骤(step1,step2)和步骤3step3并行执行,则可以向下面这样配置一个流程: 

1.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">job id="job1">

2.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">split id="split1" task-executor="taskExecutor" next="step4">

3.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">flow>

4.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="step1" parent="s1" next="step2"/>

5.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="step2" parent="s2"/>

6.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">flow>

7.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">flow>

8.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="step3" parent="s3"/>

9.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">flow>

10.</<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">split>

11.<<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="step4" parent="s4"/>

12.</<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">job>

13.<<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

 

可配置的 "task-executor" 属性是用来指明应该用哪个TaskExecutor实现来执行独立的流程。默认是SyncTaskExecutor,但有时需要使用异步的TaskExecutor来并行运行某些步骤。请注意,这项工作将确保每一个流程在聚合之前完成.并进行过渡。


1.3 远程分块(Remote Chunking)



使用远程分块的Step被拆分成多个进程进行处理,多个进程间通过中间件实现通信. 下面是一幅模型示意图:


Master组件是单个进程,从属组件(Slaves)一般是多个远程进程。如果Master进程不是瓶颈的话,那么这种模式的效果几乎是最好的,因此应该在处理数据比读取数据消耗更多时间的情况下使用(实际应用中常常是这种情形)


Master组件只是Spring Batch Step 的一个实现, 只是将ItemWriter替换为一个通用的版本,这个通用版本 "知道" 如何将数据项的分块作为消息(messages)发送给中间件。 从属组件(Slaves)是标准的监听器(listeners),不论使用哪种中间件(如使用JMS时就是MesssageListeners ), Slaves的作用都是处理数据项的分块(chunks), 可以使用标准的 ItemWriter 或者是ItemProcessor加上一个 ItemWriter, 使用的接口是 ChunkProcessor interface。使用此模式的一个优点是: reader,processorwriter 组件都是现成的(就和在本机执行的step一样)。数据项被动态地划分,工作是通过中间件共享的,因此,如果监听器都是饥饿模式的消费者,那么就自动实现了负载平衡。


中间件必须持久可靠,能保证每个消息都会被分发,且只分发给单个消费者。JMS是很受欢迎的解决方案,但在网格计算和共享内存产品空间里还有其他可选的方式(Java Spaces服务;Java对象提供分布式的共享存储)

1.4 分区


Spring Batch也为Step的分区执行和远程执行提供了一个SPI(服务提供者接口)。在这种情况下,远端的执行程序只是一些简单的Step实例,配置和使用方式都和本机处理一样容易。下面是一幅实际的模型示意图:


在左侧执行的作业(Job)是串行的Steps,而中间的那一个Step被标记为 Master。图中的Slave 都是一个Step的相同实例,对于作业来说,这些Slave的执行结果实际上等价于就是Master的结果。Slaves通常是远程服务,但也有可能是本地执行的其他线程。在此模式中,Master发送给Slave的消息不需要持久化(durable) ,也不要求保证交付: 对每个作业执行步骤来说,保存在JobRepository 中的Spring Batch元信息将确保每个Slave都会且仅会被执行一次。


Spring BatchSPIStep的一个专门的实现( PartitionStep),以及需要由特定环境实现的两个策略接口组成。这两个策略接口分别是PartitionHandlerStepExecutionSplitter,他们的角色如下面的序列图所示:




此时在右边的Step就是“远程”Slave,所以可能会有多个对象 和/或 进程在扮演这一角色,而图中的PartitionStep 在驱动(/控制)整个执行过程。PartitionStep的配置如下所示:

1.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="step1.master">

2.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">partition step="step1" partitioner="partitioner">

3.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">handler grid-size="10" task-executor="taskExecutor"/>

4.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">partition>

5.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step>


类似于多线程stepthrottle-limit 属性, grid-size属性防止单个Step的任务执行器过载。

Spring Batch Samples示例程序中有一个简单的例子在单元测试中可以拷贝/扩展(详情请参考 *PartitionJob.xml 配置文件)

Spring Batch 为分区创建执行步骤,名如“step1:partition0,等等,所以我们经常把Master step叫做“step1:master”。在Spring 3.0中也可以为Step指定别名(通过指定 name 属性,而不是 id 属性)

1.4.1 分区处理器(PartitionHandler)


PartitionHandler组件知道远程网格环境的组织结构。 它可以发送StepExecution请求给远端Steps,采用某种具体的数据格式,例如DTO.它不需要知道如何分割输入数据,或者如何聚合多个步骤执行的结果。一般来说它可能也不需要了解弹性或故障转移,因为在许多情况下这些都是结构的特性,无论如何Spring Batch总是提供了独立于结构的可重启能力: 一个失败的作业总是会被重新启动,并且只会重新执行失败的步骤。

PartitionHandler接口可以有各种结构的实现类: 如简单RMI远程方法调用,EJB远程调用,自定义web服务、JMSJava
Spaces,
共享内存网格(TerracottaCoherence)、网格执行结构(GridGain)Spring Batch自身不包含任何专有网格或远程结构的实现。




但是 Spring Batch也提供了一个有用的PartitionHandler实现,在本地分开的线程中执行Steps,该实现类名为
TaskExecutorPartitionHandler,
并且他是上面的XML配置中的默认处理器。还可以像下面这样明确地指定:

1.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step id="step1.master">

2.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">partition step="step1" handler="handler"/>

3.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">step>

4.     

5.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">bean class="org.spr...TaskExecutorPartitionHandler">

6.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">property name="taskExecutor" ref="taskExecutor"/>

7.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">property name="step" ref="step1" />

8.  <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">property name="gridSize" value="10" />

9.  </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">bean>


gridSize决定要创建的独立的step执行的数量,所以它可以配置为TaskExecutor中线程池的大小,或者也可以设置得比可用的线程数稍大一点,在这种情况下,执行块变得更小一些。


TaskExecutorPartitionHandler 对于IO密集型步骤非常给力,比如要拷贝大量的文件,或复制文件系统到内容管理系统时。它还可用于远程执行的实现,通过为远程调用提供一个代理的步骤实现(例如使用Spring Remoting)

1.4.2 分割器(Partitioner)


分割器有一个简单的职责: 仅为新的step实例生成执行环境(contexts),作为输入参数(这样重启时就不需要考虑)。 该接口只有一个方法:public interface Partitioner {

2.  Map partition(int gridSize);

3.  }

这个方法的返回值是一个Map对象,将每个Step执行分配的唯一名称(Map泛型中的 String),和与其相关的输入参数以

ExecutionContext 的形式做一个映射。 这个名称随后在批处理 meta data 中作为分区 StepExecutionsStep名字显示。ExecutionContext仅仅只是一些 名-值对的集合,所以它可以包含一系列的主键,或行号,或者是输入文件的位置。 然后远程Step 通常使用 #{...}占位符来绑定到上下文输入(step作用域内的后期绑定),详情请参见下一节。

https://blog.csdn.net/github_36849773/article/details/69226174

 

step执行的名称( Partitioner接口返回的 Map 中的 key)在整个作业的执行过程中需要保持唯一,除此之外没有其他具体要求。要做到这一点,并且需要一个对用户有意义的名称,最简单的方法是使用 前缀+后缀 的命名约定,前缀可以是被执行的Step的名称(这本身在作业Job中就是唯一的),后缀可以是一个计数器。在框架中有一个使用此约定的 SimplePartitioner


有一个可选接口 PartitioneNameProvider 可用于和分区本身独立的提供分区名称。 如果一个 Partitioner 实现了这个接口,那么重启时只有names会被查询。 如果分区是重量级的,那么这可能是一个很有用的优化。 很显
,PartitioneNameProvider提供的名称必须和Partitioner提供的名称一致。


1.4.3 将输入数据绑定到 Steps



因为step的输入参数在运行时绑定到ExecutionContext,所以由相同配置的PartitionHandler执行的steps是非常高效的。 通过 Spring BatchStepScope特性这很容易实现(详情请参考 后期绑定)。 例如,如果 Partitioner 创建 ExecutionContext 实例, 每个step执行都以fileNamekey 指向另一个不同的文件(或目录),Partitioner 的输出看起来可能像下面这样:

1.1. 由执行目的目录处理Partitioner提供的的step执行上下文名称示例


1.  | Step Execution Name (key) | ExecutionContext (value) |

2.  | filecopy:partition0 | fileName=/home/data/one |

3.  | filecopy:partition1 | fileName=/home/data/two |

4.  | filecopy:partition2 | fileName=/home/data/three |



然后就可以将文件名绑定到 step, step使用了执行上下文的后期绑定:

1.       <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">bean id="itemReader" scope="step" class="org.spr...MultiResourceItemReader">

2.       <<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">property name="resource" value="#{stepExecutionContext[fileName]}/*"/>

 

3.       </<span style="overflow-wrap: break-word;box-sizing: border-box;outline: 0px">bean>



    0

    阅读 收藏 喜欢 打印举报/Report
      

    新浪BLOG意见反馈留言板 欢迎批评指正

    新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

    新浪公司 版权所有