Spring Batch 批处理框架使用介绍

Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务。

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。对于大数据量和高性能的批处理任务,Spring Batch 同样提供了高级功能和特性来支持,比如分区功能、远程功能。总之,通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。

Spring Batch 是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。如果需要使用调用框架,在商业软件和开源软件中已经有很多优秀的企业级调度框架(如 Quartz、Tivoli、Control-M、Cron 等)可以使用。

1.现在实现Spring Batch方法,首先是配置:

@Configuration
@EnableBatchProcessing
public class SpringBatchConfiguration {

    @Resource
    private DataSource dataSource;

    @Resource
    private PlatformTransactionManager transactionManager;

    /**
     * 任务仓库
     * @return
     */
    @Bean
    public JobRepository jobRepository() throws Exception{
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    /**
     * 任务加载器
     * @return
     */
    @Bean
    public SimpleJobLauncher jobLauncher() throws Exception{
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(this.jobRepository());
        return launcher;
    }

    @Bean
    public TestJobListener testJobListener(){
        return new TestJobListener();
    }

}

2.TestJobListener监听器

public class TestJobListener implements JobExecutionListener {

    private final Logger logger = Logger.getLogger(this.getClass().getName());

    private long time = 0L;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        this.time = System.currentTimeMillis();
        logger.info(">>job start");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        logger.info(">>job end("+(System.currentTimeMillis()-time)+")");
    }
}

3.读操作代码

public class TestItemReader2 extends FlatFileItemReader<BikeKey> {

    public TestItemReader2(){

    }

    public void setData(String path,LineMapper<BikeKey> lineMapper){
        this.setResource(new FileSystemResource(path));
        this.setLineMapper(lineMapper);
    }

}

//LineMapper代码
@Component
public class TestItemLineMapper implements LineMapper<BikeKey> {

    @Override
    public BikeKey mapLine(String s, int i) throws Exception {
        System.out.println("mapLine..."+s+" i:"+i);

        String[] args = s.split(",");

        // 创建DeviceCommand对象
        BikeKey bikeKey = new BikeKey();
        bikeKey.setId(null);
        bikeKey.setStatus(0);
        bikeKey.setKeySn(args[1]);
        return bikeKey;
    }
}

4.写操作代码

@Component
public class TestItemWriter implements ItemWriter<BikeKey> {

    @Resource
    private IBikeKeyService bikeKeyService;

    @Override
    public void write(List<? extends BikeKey> list) throws Exception {
        for (int i=0;i<list.size();i++){
            System.out.println("write..."+list.size()+" i:"+i);
            bikeKeyService.insert(list.get(i));
        }
    }
}

5.处理过程代码

@Component
public class TestItemProcessor implements ItemProcessor<BikeKey,BikeKey> {
    @Override
    public BikeKey process(BikeKey bikeKey) throws Exception {
        System.out.println("process...");
        bikeKey.setKeyCreateTime((int) (DateUtil.getTime()/1000));
        return bikeKey;
    }
}

6.工作统一调用代码

@Component
public class TestDoImport {

    @Resource
    private JobLauncher jobLauncher;

    @Resource
    private JobBuilderFactory jobBuilderFactory;

    @Resource
    private StepBuilderFactory stepBuilderFactory;

    @Resource
    private TestJobListener jobListener;

    @Resource
    private TestItemLineMapper lineMapper;

    /**
     * 读操作
     */
    private TestItemReader2 reader;

    /**
     * 写操作
     */
    @Resource
    private TestItemWriter writer;

    /**
     * 处理过程
     */
    @Resource
    private TestItemProcessor processor;

    public TestDoImport(){

    }

    public void doImport(){

        /**
         * 批量任务的参数
         */
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("TIME",System.currentTimeMillis())
                .toJobParameters();
        try {
            /**
             * 设置数据路径
             */
            reader = new TestItemReader2();
            reader.setData("d:/bike_key.csv",lineMapper);

            /**
             * 执行任务
             */
            jobLauncher.run(this.getJob(jobBuilderFactory,this.getStep(stepBuilderFactory,reader,writer,processor)),jobParameters);
        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }

    }

    /**
     * 获取一个工作实例
     * @param jobs
     * @param step
     * @return
     */
    private Job getJob(JobBuilderFactory jobs, Step step){
        return jobs
                .get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(step)
                .end()
                .listener(jobListener)//监听整个过程
                .build();
    }

    /**
     * 获取一个步骤实例
     * @param stepBuilderFactory
     * @param reader 读
     * @param writer 写
     * @param processor 过程
     * @return
     */
    private Step getStep(StepBuilderFactory stepBuilderFactory, ItemReader<BikeKey> reader, ItemWriter<BikeKey> writer, ItemProcessor<BikeKey,BikeKey> processor){
        return stepBuilderFactory
                .get("step1")
                .<BikeKey,BikeKey> chunk(1)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

 

运行日志:

23-May-2018 13:42:03.817 信息 [http-nio-8080-exec-6] com.mymvc.system.batch.listener.TestJobListener.beforeJob >>job start
mapLine…,a0011 i:1
process…
write…1 i:0
mapLine…,a0022 i:2
process…
write…1 i:0
mapLine…,a0033 i:3
process…
write…1 i:0
mapLine…,a0044 i:4
process…
write…1 i:0
mapLine…,a0055 i:5
process…
write…1 i:0
mapLine…,a0016 i:6
process…
write…1 i:0
mapLine…,a0027 i:7
process…
write…1 i:0
mapLine…,a0038 i:8
process…
write…1 i:0
mapLine…,a0049 i:9
process…
write…1 i:0
mapLine…,a00510 i:10
process…
write…1 i:0
mapLine…,a00111 i:11
process…
write…1 i:0
mapLine…,a00212 i:12
process…
write…1 i:0
mapLine…,a00313 i:13
process…
write…1 i:0
mapLine…,a00414 i:14
process…
write…1 i:0
mapLine…,a00515 i:15
process…
write…1 i:0
mapLine…,a00116 i:16
process…
write…1 i:0
mapLine…,a00217 i:17
process…
write…1 i:0
mapLine…,a00318 i:18
process…
write…1 i:0
mapLine…,a00419 i:19
process…
write…1 i:0
mapLine…,a00520 i:20
process…
write…1 i:0
23-May-2018 13:42:05.252 信息 [http-nio-8080-exec-6] com.mymvc.system.batch.listener.TestJobListener.afterJob >>job end(1435)

 

 

 

 

 

Leave a Comment

 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |