spring batch的批处理框架简单介绍 - Spring

标签:
杂谈 |
分类: spring |
有关spring batch的介绍我就不多说了,可以去下面的网址看看:
http://www.infoq.com/cn/news/2008/07/spring-batch-zh
刚接触到spring batch的时候无从下手,javaeye有关的帖子博文也非常的少,很郁闷只能看它自己提供的文档,说真的,那文档帮助不大,顶多就是让你知道spring batch靠这么几个类玩的。没办法只能自己一步步看代码调试,走了不少弯路呢。
这篇文章简单介绍一下spring batch是怎么处理单个文件的。
首先看下spring batch关键的几个类:
http://image15.360doc.com/DownloadImg/2010/09/2114/5445621_1.jpgbatch的批处理框架简单介绍
JobLauncher负责启动Job,Job中干事的是Step,想让Step干活,就要给它点工具,itemReader,ItemProcessor,itemWriter就是它需要的工具,这些工具可以是你自己提供,以可以用spring batch现成的。就看实际情况了。
接下来就具体看看这些类究竟包含了什么东西,干了哪些事情。
先看配置文件:
- <bean
id="jobLauncher" -
class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> -
<property name="jobRepository" ref="jobRepository" /> - </bean>
-
-
- <bean
id="jobRepository"class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean " -
p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" />
- <!--
对账单批处理 begin --> -
<bean id="checkSheetFileImportJob" parent="simpleJob"> -
<property name="steps"> -
<list> -
<bean id="checkSheetTransactionLoa dStep" parent="simpleStep"> -
<property name="commitInterval" value="3"/> -
<property name="allowStartIfComplete" value="true"/> -
<property name="itemReader" ref="checkSheetTransactionIte mReader" /> -
<property name="itemWriter" ref="checkSheetTransactionWri ter" /> -
</bean> -
</list> -
</property> -
</bean> -
-
<bean id="checkSheetTransactionWri ter" class="com.longtop.netbank.checksheet.CheckSheetTransactionJDB CWriter" > -
<property name="dataSource" ref="dataSource"/> -
<property name="resource" ref="checkSheetFileResource"></property> -
</bean> -
-
<bean id="checkSheetTransactionIte mReader" class="org.springframework.batch.item.file.FlatFileItemReader"> -
<property name="resource" ref="checkSheetFileResource"/> -
<property name="linesToSkip" value="0"/> -
<property name="lineMapper"> -
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> -
<property name="lineTokenizer"> -
<bean -
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> -
<property name="names" -
value="accountNo,transactionDate,transactionType,oppositeAccountNo,oppositeAccountNickName,summary,lending,transactionAmount,balance"/> -
</bean> -
</property> -
<property name="fieldSetMapper"> -
<bean class="com.longtop.netbank.checksheet.CheckSheetTransactionFie ldSetMapper" /> -
</property> -
</bean> -
</property> -
</bean>
-
<bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true"> -
<property name="jobRepository" ref="jobRepository" /> -
<property name="restartable" value="true" /> - </bean>
-
- <bean
id="simpleStep" class="org.springframework.batch.core.step.item.SimpleStepFactoryBean" -
abstract="true"> -
<property name="transactionManager" ref="transactionManager" /> -
<property name="jobRepository" ref="jobRepository" /> -
<property name="commitInterval" value="1" /> - </bean>
在示例中,step的工作就是通过itemReader读取数据文件,然后用itemProcessor进行处理,然后通过itemWriter写入到数据库中,示例中的step也是用的springbatch自带的类SimpleStepFactoryBean。
对于SimpleStepFactoryBean需要花功夫好好的看看它究竟做了哪些事情。
-
public
class SimpleStepFactoryBean<T,S> implements FactoryBean, BeanNameAware { -
-
private
int commitInterval = 0; -
-
-
public final Object getObject() throws Exception { -
TaskletStep step = new TaskletStep(getName()); -
applyConfiguration(step); -
return step; -
} -
-
public Class<Step> getObjectType() { -
return Step.class; -
} - ...
-
}
commitInterval
如果commitInterval=10,数据文件有30个数据,当读到写到24个数据的时候抛出了异常,那么成功写入数据库的数据以后20条,第21--第24条数据放弃。下次如果执行断点回复时,就从第21条数据开始
-
-
private CompletionPolicy getChunkCompletionPolicy () { -
Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0), -
"You must );specify either a chunkCompletionPolicy or a commitInterval but not both." -
Assert.state(commitInterval >= 0, "The commitInterval );must be positive or zero (for default value)." -
-
if (chunkCompletionPolicy != null) { -
return chunkCompletionPolicy; -
} -
if (commitInterval == 0) { -
logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")"); -
commitInterval = DEFAULT_COMMIT_INTERVAL; -
} -
return new SimpleCompletionPolicy(commitInterval); -
}
下面就大概的讲下执行步骤(只贴类名和部分代码,类名方便查找源代码):
1 jobLauncher调用job:
- public
class SimpleJobLauncher implements JobLauncher, InitializingBean { -
-
public JobExecution run(final Job job, final JobParameters jobParameters) -
throws JobExecutionAlreadyRunni ngException, JobRestartException, JobInstanceAlreadyComple teException { -
-
Assert.notNull(job, "The Job );must not be null." -
Assert.notNull(jobParameters, "The JobParameters );must not be null." -
-
boolean exists = jobRepository.isJobInstanceExists(job.getName(), jobParameters); -
if (exists && !job.isRestartable()) { -
throw new JobRestartException("JobInstance already );exists and is not restartable" -
} -
-
final JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); -
-
taskExecutor.execute(new Runnable() { -
-
public void run() { -
try { -
-
job.execute(jobExecution); -
-
} -
catch (Throwable t) { -
logger.info("Job: [" + job + "] failed with the following parameters: [" + jobParameters + "]", t); -
rethrow(t); -
} -
} -
-
private void rethrow(Throwable t) { -
if (t instanceof RuntimeException) { -
throw (RuntimeException) t; -
} -
throw new RuntimeException(t); -
} -
}); -
-
return jobExecution; -
}
2 job执行step
- public
class SimpleJob extends AbstractJob { -
public void execute(JobExecution execution) { - ...
-
getCompositeListener().beforeJob(execution); -
-
StepExecution lastStepExecution = handleSteps(steps, execution); - ...
-
try { -
getCompositeListener().afterJob(execution); -
} - ...
-
}
那么程序会根据最后一个step的stepExecution来更新jobExecution.
3 step的执行
- public
abstract class AbstractStep implements Step, InitializingBean, BeanNameAware { -
-
-
public final void execute(StepExecution stepExecution) throws JobInterruptedException, -
UnexpectedJobExecutionEx ception 『 -
... -
} -
}
-
open(stepExecution.getExecutionContext());
也就是调用 FlatFileItemReader的open方法,因为在配置文件中itemReader的实现类型就是FlatFileItemReader,在SimpleStepBeanFacotry的applyConfiguration中讲FlatFileItemReader配置到TaskletStep中。(详情请看代码)。
然后调用TaskletStep的doExecute方法
-
exitStatus
= doExecute(stepExecution);
- return
stepOperations.iterate(ReapCallBack callback)
-
exitStatus
= tasklet.execute(contribution, attributes);
SimpleChunkOrientedTaskl
这里有一点饶人的地方就是stepOperations.iterate了,看代码的时候需要明确的一点是stepOperations(RepeatTemplate)是使用CompletionPolicy接口来控制循环的,对于TaskletStep和SimpleChunkOrientedTaskl
applyConfiguration方法。
上面介绍的就是一个job的执行过程。因为是第一次写。所以很多地方都写有说清楚,如果有疑问,请提出来。
附件是demo,要创建一个springbatch的数据就可以运行。当然这个demo有小陷阱(每次执行的时候会删除数据表重新创建)大家可以根据需要修改一下,当然如果每次都删除数据表的话,就看不到spring batch断点回复的功能了。
因为附件太大不能上传,各位只能自己去下了。