ITPub博客

首页 > 应用开发 > Java > spring batch入门实例

spring batch入门实例

Java 作者:xz43 时间:2015-11-02 10:46:20 0 删除 编辑
转载地址:http://blog.csdn.net/limiteewaltwo/article/details/8832771

为什么要进行批处理,哪些场合适合进行批处理?

[plain] view plaincopy
  1. 企业领域的许多应用需要在严格的任务环境下,进行大量的商业计算。这些商业计算包括在大数据集中进行自动而又复杂的计算,而且这些计算不需要人工的干预。这些任务中经典的就是基于时间驱动的事件的复杂商业规则的周期性的应用,在大数据集上的重复计算,或者从内部、外部系统接收到的信息的集成,这些信息需要格式化、校验、像一个事务一样处理。批处理能为公司每天处理10亿计的数据。  

Spring batch官方的文档中是这样介绍自己的:

[plain] view plaincopy
  1. Spring batch是一个轻量级的,易理解的批处理框架,它被设计成用于健壮的批处理应用,特别是企业系统的日常操作。Spring batch构建于产业化的、基于POJO的开发探索,能轻松地使用Spring框架的扩展能力,必要的时候,开发者也能很轻松地进行扩展。Spring batch不是一个任务框架。有很多的商业化的或者开源的任务框架,比如:Quartz,Tivoli,Control-M,等等。Spring batch是用来和这些协同工作,而不是取代它们。  
[plain] view plaincopy
  1. Spring Batch提供了一些处理大容量数据的必要的可重用的函数。包括日志、事物管理,任务进度统计,任务重启,跳过,资源管理。它也提供了一些更加高级的技术服务和特性,允许处理高容量,高性能批处理任务,来应对过程优化和分布式计算。简单也很复杂,高容量的批处理任务能够以一种高度可伸缩的方式提升框架处理大量信息的能力。  
[plain] view plaincopy
  1. 当开源软件项目和相关的软件组织把他们的注意力放在基于web的和SOA消息驱动的结构框架上,基于java的可重用的批处理框架就显得那样的少,尽管企业的IT环境的需求却一直在增长。一个标准的可重用的批处理框架的缺乏导致了许多一次性的,家庭的解决方案出现在客户的IT函数中。  
  2. SpringSource和Accenture合作来改变这样情况,Accenture在自己产业上的批处理经验加上SpringSource在技术经验上的深度,和Spring已经证明的技术模型。产生了一个自然的,强大的团队,创建了个一高质量的,市场化的软件,增补了企业JAVA在批处理上的缺失。两个公司都在致力于帮一些客户解决一些相同的问题,开发基于Spring的批处理结构解决方案。这提供了一些有用的附加细节和实际的原则,帮助我们处理现实中客户提出的实际问题。因为这些和其它的一些原因,SpringSource和Acceture组建了一个团队合作开发Spring Batch。  
  3. Acceture贡献了前面所说的批处理的结构框架,基于在前几代系统构建批处理的经验,包括Spring Batch中的资源列表,扩展,和蓝图。  
  4. Acceture和Spring Batch这间的合作就是为了促进企业开发者在开发批处理应用时,能在软件处理过程中的探索、框架、工具中进行拓展。公司和政府机构,想在他们的企业IT环境中分发标准的,已经被证明的解决方案时,就会从Spring Batch中受益。  

Spring Batch的使用场景:

[plain] view plaincopy
  1. 一个经典的批处理程序是这样的:从数据库、文件、或者队列中读取大量的记录,以一种模式时行处理,然后以一种修改后的形式写回。Spring Batch自动化了这个批处理叠代,提供了以一个数据集的方式处理相似事物的能力,典型的就是没有用户交互的离线环境。批处理任务是大多数IT项目的一部分,而且Spring Batch是唯一的一个提供开源的、健壮的、企业级可伸缩性的框架。  
  2. 业务场景:  
  3. 1、阶段性的批处理提交  
  4. 2、同时处理:并行的任务处理  
  5. 3、分步的、企业级的任务处理  
  6. 4、大量的并行批处理任务  
  7. 5、失败之后的手动或都计划性重启  
  8. 6、独立步骤的连续处理  
  9. 7、部分处理:跳过错误项  
  10. 8、完全的批处理事务  
  11.   
  12. 技术目标:  
  13. 1、批处理开发者使用Spring技术模型,专注于业务逻辑,让框架来处理底层的细节;  
  14. 2、概念和底层的清晰分离,批处理的执行环境和批处理应用;  
  15. 3、以接口的方式提供一个一般化的、核心的运行服务,所有的项目都能实现它;  
  16. 4、提供核心接口的默认简单的实现,让开发者很容易上手;  
  17. 5、很简单的配置、个性化、扩展服务,通过在Spring的各个层进行扩展;  
  18. 6、提供一个简单的部署模型,JAR包和应用分离,使用maven进行构建。  

Spring Batch的结构

[plain] view plaincopy
  1. Spring Batch在设计的时候考虑到了扩展性和各种最终开发的用户的多样性。下面的图展示了分层框架的骨架,支持扩展和开发者的易用性。  

[plain] view plaincopy
  1. 分层结构展示了分层结构的三个主要部件:应用层,核心层,和基础结构层。一个应用包含了所有的批处理任务和开发者用Spring Batch写的代码。Core层包含了一些核心的运行时类,用来运行来管理一个批处理任务。它包括了像JobLauncher和Job,还有Step这样的实现。应用层和核心层都是构建在基础结构层之上。基础结构层包括了readers和writers,还有services,比如RetryTemplate,这些被应用开发者和核心框架共用。  


这是一篇spring batch的入门实例,如果你对spring的一些基本概念还不是很了解,可能会有难度,本文也假设你已经对maven有一些了解。
首先新建一个maven项目,注意是jar项目,pom.xml如下:

[html] view plaincopy
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.     <modelVersion>4.0.0</modelVersion>  
  4.     <groupId>com.test.springbatch</groupId>  
  5.     <artifactId>spring-batch</artifactId>  
  6.     <version>0.0.1-SNAPSHOT</version>  
  7.     <name>spring-batch</name>  
  8.     <description>spring-batch</description>  
  9.     <properties>  
  10.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  11.         <org.springframework.version>3.0.7.RELEASE</org.springframework.version>  
  12.     </properties>  
  13.     <dependencies>  
  14.         <!-- 测试用 start -->  
  15.         <dependency>  
  16.             <groupId>junit</groupId>  
  17.             <artifactId>junit</artifactId>  
  18.             <version>4.11</version>  
  19.             <scope>test</scope>  
  20.         </dependency>  
  21.         <dependency>  
  22.             <groupId>org.slf4j</groupId>  
  23.             <artifactId>slf4j-api</artifactId>  
  24.             <version>1.6.1</version>  
  25.         </dependency>  
  26.         <dependency>  
  27.             <groupId>org.slf4j</groupId>  
  28.             <artifactId>slf4j-log4j12</artifactId>  
  29.             <version>1.6.1</version>  
  30.         </dependency>  
  31.         <dependency>  
  32.             <groupId>log4j</groupId>  
  33.             <artifactId>log4j</artifactId>  
  34.             <version>1.2.16</version>  
  35.         </dependency>  
  36.         <dependency>  
  37.             <groupId>org.springframework</groupId>  
  38.             <artifactId>spring-core</artifactId>  
  39.             <version>${org.springframework.version}</version>  
  40.         </dependency>  
  41.         <dependency>  
  42.             <groupId>org.springframework.batch</groupId>  
  43.             <artifactId>spring-batch-core</artifactId>  
  44.             <version>2.1.9.RELEASE</version>  
  45.         </dependency>  
  46.     </dependencies>  
  47. </project>  

spring-batch的配置文件(applicationContext-batch-myfirstbatch.xml)如下:
[html] view plaincopy
  1. <beans:beans xmlns="http://www.springframework.org/schema/batch"  
  2.     xmlns:beans="http://www.springframework.org/schema/beans" xmlns:bean="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xsi:schemaLocation="http://www.springframework.org/schema/beans   
  5.     http://www.springframework.org/schema/beans/spring-beans.xsd  
  6.     http://www.springframework.org/schema/batch  
  7.     http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">  
  8.     <bean:bean id="jobRepository"  
  9.         class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">  
  10.         <bean:property name="transactionManager" ref="transactionManager" />  
  11.     </bean:bean>  
  12.     <bean:bean id="transactionManager"  
  13.         class="org.springframework.batch.support.transaction.ResourcelessTransactionManager">  
  14.     </bean:bean>  
  15.     <bean:bean id="jobLauncher"  
  16.         class="org.springframework.batch.core.launch.support.SimpleJobLauncher">  
  17.         <bean:property name="jobRepository" ref="jobRepository" />  
  18.     </bean:bean>  
  19.     <job id="ioSampleJob">  
  20.         <step id="step1">  
  21.             <tasklet ref="begin"></tasklet>  
  22.         </step>  
  23.     </job>  
  24.     <bean:bean id="begin" class="com.test.tasklet.MyFirstTasklet" scope="step">  
  25.     </bean:bean>  
  26. </beans:beans>  
这个文件放在src/main/resources文件夹下。

[html] view plaincopy
  1. com.test.tasklet.MyFirstTasklet  
的代码如下:
[java] view plaincopy
  1. /** 
  2.  *  
  3.  */  
  4. package com.test.tasklet;  
  5.   
  6. import org.springframework.batch.core.StepContribution;  
  7. import org.springframework.batch.core.scope.context.ChunkContext;  
  8. import org.springframework.batch.core.step.tasklet.Tasklet;  
  9. import org.springframework.batch.repeat.RepeatStatus;  
  10.   
  11. /** 
  12.  * @author hadoop 
  13.  * 
  14.  */  
  15. public class MyFirstTasklet implements Tasklet {  
  16.   
  17.     /* (non-Javadoc) 
  18.      * @see org.springframework.batch.core.step.tasklet.Tasklet#execute(org.springframework.batch.core.StepContribution, org.springframework.batch.core.scope.context.ChunkContext) 
  19.      */  
  20.     public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)  
  21.             throws Exception {  
  22.           
  23.         for(int i = 0; i < 10; i++)  
  24.         {  
  25.             System.out.println(i);  
  26.         }  
  27.           
  28.         return RepeatStatus.FINISHED;  
  29.     }  
  30.   
  31. }  

main函数:
[java] view plaincopy
  1. /** 
  2.  *  
  3.  */  
  4. package com.test.springbatch;  
  5.   
  6. import org.springframework.batch.core.ExitStatus;  
  7. import org.springframework.batch.core.Job;  
  8. import org.springframework.batch.core.JobExecution;  
  9. import org.springframework.batch.core.JobParametersBuilder;  
  10. import org.springframework.batch.core.JobParametersInvalidException;  
  11. import org.springframework.batch.core.launch.JobLauncher;  
  12. import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;  
  13. import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;  
  14. import org.springframework.batch.core.repository.JobRestartException;  
  15. import org.springframework.context.ApplicationContext;  
  16. import org.springframework.context.support.FileSystemXmlApplicationContext;  
  17.   
  18. /** 
  19.  * @author hadoop 
  20.  * 
  21.  */  
  22. public class AppMain {  
  23.   
  24.     /** 
  25.      * @param args 
  26.      */  
  27.     public static void main(String[] args) {  
  28.         ApplicationContext context;  
  29.         JobParametersBuilder jobPara = new JobParametersBuilder();  //设置文件路径参数  
  30.         context = new FileSystemXmlApplicationContext(new String[]{"classpath:applicationContext-batch-myfirstbatch.xml"});  
  31.         String jobName = "ioSampleJob";  
  32.         Job job = (Job)context.getBean(jobName);  
  33.         JobLauncher launcher = (JobLauncher)context.getBean("jobLauncher");  
  34.         JobExecution result = null;  
  35.         try {  
  36.           result = launcher.run(job, jobPara.toJobParameters());  
  37.         } catch (JobExecutionAlreadyRunningException e) {  
  38.             e.printStackTrace();  
  39.         } catch (JobRestartException e) {  
  40.             e.printStackTrace();  
  41.         } catch (JobInstanceAlreadyCompleteException e) {  
  42.             e.printStackTrace();  
  43.         } catch (JobParametersInvalidException e) {  
  44.             e.printStackTrace();  
  45.         }  
  46.         ExitStatus es = result.getExitStatus();  
  47.         if(es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) //任务正常完成  
  48.         {  
  49.             System.out.println("任务正常完成");  
  50.         }  
  51.         else  
  52.         {  
  53.             System.out.println("任务失败");  
  54.         }  
  55.     }  
  56.   
  57. }  
一个spring batch的入门实例的代码就这么多。
但用到的概念却不多,下面将一一说明。
Job,一个Job就是一个任务;

把log4j的输出级别定义为info级别。


[html] view plaincopy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">  
  3. <log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/'>  
  4.   
  5.     <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">  
  6.     <param name="Target" value="System.out" />  
  7.         <layout class="org.apache.log4j.PatternLayout">  
  8.             <param name="ConversionPattern" value="%p [%c] - %m%n" />  
  9.         </layout>  
  10.         <!--过滤器设置输出的级别 -->  
  11.         <filter class="org.apache.log4j.varia.LevelRangeFilter">  
  12.             <param name="levelMin" value="DEBUG" />  
  13.             <param name="levelMax" value="ERROR" />  
  14.         </filter>  
  15.     </appender>  
  16.       
  17.     <!-- 根logger的设置 -->  
  18.     <root>  
  19.         <priority value="INFO" />  
  20.         <appender-ref ref="CONSOLE"/>  
  21.     </root>  
  22. </log4j:configuration>  

重试的代码示例:


[java] view plaincopy
  1. /** 
  2.  *  
  3.  */  
  4. package com.test.springbatch;  
  5.   
  6. import java.util.HashMap;  
  7. import java.util.Map;  
  8.   
  9. import org.apache.log4j.Logger;  
  10. import org.springframework.batch.retry.RetryCallback;  
  11. import org.springframework.batch.retry.RetryContext;  
  12. import org.springframework.batch.retry.policy.SimpleRetryPolicy;  
  13. import org.springframework.batch.retry.support.RetryTemplate;  
  14.   
  15. /** 
  16.  * @author hadoop 
  17.  * 
  18.  */  
  19. public class RetryTest {  
  20.       
  21.     private static Logger logger = Logger.getLogger(RetryTest.class);  
  22.   
  23.     /** 
  24.      * @param args 
  25.      */  
  26.     public static void main(String[] args) {  
  27.           
  28.         /** 
  29.          * SimpleRetryPolicy策略,重试固定的次数,包括第一次执行; 
  30.          */  
  31.         SimpleRetryPolicy policy = new SimpleRetryPolicy();  
  32.         /** 
  33.          * 最多尝试次数,第一次的序号为0,5表示共尝试5次(包括原始的第一次执行)。 
  34.          */  
  35.         policy.setMaxAttempts(5);  
  36.         Map<class<? extends</class<?  Throwable>,Boolean> retryExceptionMap = new HashMap<class<? extends Throwable>,Boolean>();  </class<? <>
  37.         /** 
  38.          * 设置发生哪些异常时,重试 
  39.          */  
  40.         retryExceptionMap.put(Exception.classtrue);  
  41.         policy.setRetryableExceptions(retryExceptionMap);  
  42.         RetryTemplate template = new RetryTemplate();  
  43.         template.setRetryPolicy(policy);  
  44.         boolean runResult = false;  
  45.         try {  
  46.             runResult = template.execute(new RetryCallback(){  
  47.                   
  48.                 private int count = 0;  
  49.                   
  50.                 public Boolean doWithRetry(RetryContext context) throws Exception {  
  51.                     count++;  
  52.                     if(count < 5)  
  53.                     {  
  54.                         logger.info("exception happen" + count);  
  55.                         throw new Exception("exception happen" + count);  
  56.                     }  
  57.                     logger.info("here" + count);  
  58.                     return true;  
  59.                 }  
  60.             });  
  61.         } catch (Exception e) {  
  62.             e.printStackTrace();  
  63.         }  
  64.         if(runResult)  
  65.         {  
  66.             logger.info("成功");  
  67.         }  
  68.         else  
  69.         {  
  70.             logger.info("失败");  
  71.         }  
  72.     }  
  73.   
  74. }  

设计的情形为,前4次抛出异常,第五次执行成功,执行代码,我们得到如下的输出:



[plain] view plaincopy
  1. INFO [com.test.springbatch.RetryTest] - exception happen1  
  2. INFO [com.test.springbatch.RetryTest] - exception happen2  
  3. INFO [com.test.springbatch.RetryTest] - exception happen3  
  4. INFO [com.test.springbatch.RetryTest] - exception happen4  
  5. INFO [com.test.springbatch.RetryTest] - here5  
  6. INFO [com.test.springbatch.RetryTest] - 成功  

第五次执行成功,符合我们设想的结果。


如果把重试次数改为4,会发生什么呢。


[java] view plaincopy
  1. policy.setMaxAttempts(4);  
执行结果如下:



[plain] view plaincopy
  1. INFO [com.test.springbatch.RetryTest] - exception happen1  
  2. INFO [com.test.springbatch.RetryTest] - exception happen2  
  3. INFO [com.test.springbatch.RetryTest] - exception happen3  
  4. INFO [com.test.springbatch.RetryTest] - exception happen4  
  5. java.lang.Exception: exception happen4  
  6.     at com.test.springbatch.RetryTest$1.doWithRetry(RetryTest.java:55)  
  7.     at com.test.springbatch.RetryTest$1.doWithRetry(RetryTest.java:1)  
  8.     at org.springframework.batch.retry.support.RetryTemplate.doExecute(RetryTemplate.java:240)  
  9.     at org.springframework.batch.retry.support.RetryTemplate.execute(RetryTemplate.java:147)  
  10.     at com.test.springbatch.RetryTest.main(RetryTest.java:46)  
  11. INFO [com.test.springbatch.RetryTest] - 失败  

第4次发生了异常之后,马上就把异常抛给了我们自己定义的异常处理。


代码级的重试给了我们很高的灵活度,把某些比较容易出错的代码放入其中,能很好的增强我们代码的健壮性。


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/9399028/viewspace-1820603/,如需转载,请注明出处,否则将追究法律责任。

下一篇: 征服 Redis + Jedis
请登录后发表评论 登录
全部评论

注册时间:2010-11-16

  • 博文量
    407
  • 访问量
    1733240