转自: https://www.cnblogs.com/javastack/p/17653257.html
前言
概念词就不多说了,我简单地介绍下,spring batch 是一个方便使用的较健全的批处理框架。
为什么说是方便使用的?
因为这是基于spring的一个框架,接入简单、易理解、流程分明。
为什么说是较健全的?
因为它提供了往常我们在对大批量数据进行处理时需要考虑到的:日志跟踪、事务粒度调配、可控执行、失败机制、重试机制、数据读写等。
业务场景
从实现的业务场景来说,有以下两个:
- 从 csv文件 读取数据,进行业务处理再存储
- 从 数据库 读取数据,进行业务处理再存储
也就是平时经常遇到的数据清理或者数据过滤,又或者是数据迁移备份等等。大批量的数据,自己实现分批处理需要考虑的东西太多了,又不放心,那么使用 Spring Batch 框架是一个很好的选择。
核心组件介绍
首先,在进入实例教程前,我们看看这次的实例里,我们使用springboot 整合spring batch 框架,要编码的东西有什么:
- JobRepository:job的注册/存储器
- JobLauncher:job的执行器
- Job:job任务,包含一个或多个Step
- Step:包含(ItemReader、ItemProcessor和ItemWriter)
- ItemReader:数据读取器
- ItemProcessor:数据处理器
- ItemWriter:数据输出器
可能大家看到这个,是不是多多少少想起来定时任务框架?确实有那么点像,但是我必须在这告诉大家,这是一个批处理框架,不是一个scheduling框架。但是前面提到它提供了可执行控制,也就是说,啥时候执行是可控的,那么显然就是自己可以进行扩展结合定时任务框架,实现你心中所想。
实战:CSV文件处理
准备工作
首先准备一个数据库,里面建一张简单的表,用于实例数据的写入存储或者说是读取等等。
bloginfo表
1 2 3 4 5 6 7 8
| CREATE TABLE `bloginfo` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客作者标识', `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客链接', `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客标题', `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客栏目', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
|
Maven依赖
pom文件里的核心依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
<dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.0.7.Final</version> </dependency>
<dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.0.0</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.18</version> </dependency>
|
配置文件
yml文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| spring: batch: job: enabled: false initialize-schema: always
datasource: druid: username: root password: root url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull driver-class-name: com.mysql.cj.jdbc.Driver initialSize: 5 minIdle: 5 maxActive: 20 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 useGlobalDataSourceStat: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 server: port: 8665
|
POJO层
BlogInfo.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
public class BlogInfo {
private Integer id; private String blogAuthor; private String blogUrl; private String blogTitle; private String blogItem;
@Override public String toString() { return "BlogInfo{" + "id=" + id + ", blogAuthor='" + blogAuthor + '\'' + ", blogUrl='" + blogUrl + '\'' + ", blogTitle='" + blogTitle + '\'' + ", blogItem='" + blogItem + '\'' + '}'; }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getBlogAuthor() { return blogAuthor; }
public void setBlogAuthor(String blogAuthor) { this.blogAuthor = blogAuthor; }
public String getBlogUrl() { return blogUrl; }
public void setBlogUrl(String blogUrl) { this.blogUrl = blogUrl; }
public String getBlogTitle() { return blogTitle; }
public void setBlogTitle(String blogTitle) { this.blogTitle = blogTitle; }
public String getBlogItem() { return blogItem; }
public void setBlogItem(String blogItem) { this.blogItem = blogItem; } }
|
Mapper层
BlogMapper.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import com.example.batchdemo.pojo.BlogInfo; import org.apache.ibatis.annotations.*; import java.util.List; import java.util.Map;
@Mapper public interface BlogMapper { @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem ) VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) ") @Options(useGeneratedKeys = true, keyProperty = "id") int insert(BlogInfo bloginfo);
@Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}") List<BlogInfo> queryInfoById(Map<String , Integer> map);
}
|
批处理配置类
创建一个配置类 MyBatchConfig.java,里面包含所有批处理相关的组件配置。
首先在类前加入注解:
@Configuration:用于告诉spring,咱们这个类是一个自定义配置类,里面很多bean都需要加载到spring容器里面@EnableBatchProcessing:开启批处理支持
JobRepository
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Bean public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDatabaseType("mysql"); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDataSource(dataSource); return jobRepositoryFactoryBean.getObject(); }
|
JobLauncher
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Bean public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager)); return jobLauncher; }
|
Job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Bean public Job myJob(JobBuilderFactory jobs, Step myStep){ return jobs.get("myJob") .incrementer(new RunIdIncrementer()) .flow(myStep) .end() .listener(myJobListener()) .build(); }
|
JobListener
对于Job的运行,是可以配置监听器的:
1 2 3 4 5 6 7 8
|
@Bean public MyJobListener myJobListener(){ return new MyJobListener(); }
|
创建自定义监听器 MyJobListener.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public class MyJobListener implements JobExecutionListener {
private Logger logger = LoggerFactory.getLogger(MyJobListener.class);
@Override public void beforeJob(JobExecution jobExecution) { logger.info("job 开始, id={}",jobExecution.getJobId()); }
@Override public void afterJob(JobExecution jobExecution) { logger.info("job 结束, id={}",jobExecution.getJobId()); } }
|
ItemReader
从CSV文件读取数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
@Bean public ItemReader<BlogInfo> reader(){ FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("static/bloginfo.csv")); reader.setLineMapper(new DefaultLineMapper<BlogInfo>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[]{"blogAuthor","blogUrl","blogTitle","blogItem"}); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() { { setTargetType(BlogInfo.class); } }); } }); return reader; }
|
对于数据读取器 ItemReader,我们给它安排了一个读取监听器,创建 MyReadListener.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public class MyReadListener implements ItemReadListener<BlogInfo> {
private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
@Override public void beforeRead() { }
@Override public void afterRead(BlogInfo item) { }
@Override public void onReadError(Exception ex) { try { logger.info(format("%s%n", ex.getMessage())); } catch (Exception e) { e.printStackTrace(); } } }
|
ItemProcessor
数据处理器,是我们自定义的,里面主要是包含我们对数据处理的业务逻辑,并且我们设置了一些数据校验器,我们这里使用 JSR-303的Validator来作为校验器。
1 2 3 4 5 6 7 8 9 10 11
|
@Bean public ItemProcessor<BlogInfo, BlogInfo> processor(){ MyItemProcessor myItemProcessor = new MyItemProcessor(); myItemProcessor.setValidator(myBeanValidator()); return myItemProcessor; }
|
创建 MyItemProcessor.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.example.batchdemo.pojo.BlogInfo; import org.springframework.batch.item.validator.ValidatingItemProcessor; import org.springframework.batch.item.validator.ValidationException;
public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> { @Override public BlogInfo process(BlogInfo item) throws ValidationException {
super.process(item);
if (item.getBlogItem().equals("springboot")) { item.setBlogTitle("springboot 系列还请看看我Jc"); } else { item.setBlogTitle("未知系列"); } return item; } }
|
创建校验器 MyBeanValidator.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import org.springframework.batch.item.validator.ValidationException; import org.springframework.batch.item.validator.Validator; import org.springframework.beans.factory.InitializingBean; import javax.validation.ConstraintViolation; import javax.validation.Validation; import javax.validation.ValidatorFactory; import java.util.Set;
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;
@Override public void validate(T value) throws ValidationException {
Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); if (constraintViolations.size() > 0) { StringBuilder message = new StringBuilder(); for (ConstraintViolation<T> constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage() + "\n"); } throw new ValidationException(message.toString()); } }
@Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); }
}
|
ItemWriter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
@Bean public ItemWriter<BlogInfo> writer(DataSource dataSource){ JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>()); String sql = "insert into bloginfo "+" (blogAuthor,blogUrl,blogTitle,blogItem) " +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; }
|
同样对于数据输出器 ItemWriter,我们给它也安排了一个输出监听器,创建 MyWriteListener.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import com.example.batchdemo.pojo.BlogInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ItemWriteListener; import java.util.List; import static java.lang.String.format;
public class MyWriteListener implements ItemWriteListener<BlogInfo> { private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
@Override public void beforeWrite(List<? extends BlogInfo> items) { }
@Override public void afterWrite(List<? extends BlogInfo> items) { }
@Override public void onWriteError(Exception exception, List<? extends BlogInfo> items) { try { logger.info(format("%s%n", exception.getMessage())); for (BlogInfo message : items) { logger.info(format("Failed writing BlogInfo : %s", message.toString())); }
} catch (Exception e) { e.printStackTrace(); }
} }
|
Step配置
ItemReader、ItemProcessor、ItemWriter,这三个小组件到这里,我们都实现了,那么接下来就是把这三个小组件跟我们的step去绑定起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Bean public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader, ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){ return stepBuilderFactory .get("myStep") .<BlogInfo, BlogInfo>chunk(65000) .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2) .listener(new MyReadListener()) .processor(processor) .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2) .listener(new MyWriteListener()) .build(); }
|
触发Job
我们通过接口去触发这个批处理事件,新建一个Controller,TestController.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@RestController public class TestController { @Autowired SimpleJobLauncher jobLauncher;
@Autowired Job myJob;
@GetMapping("testJob") public void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); jobLauncher.run(myJob, jobParameters);
} }
|
实战:数据库处理
从数据库表内读取数据进行处理输出到新的表里面。基于我们上边的整合,我们已经实现了:
1 2 3 4 5 6 7 8 9 10 11
| JobRepository job的注册/存储器 JobLauncher job的执行器 Job job任务,包含一个或多个Step Step 包含(ItemReader、ItemProcessor和ItemWriter) ItemReader 数据读取器 ItemProcessor 数据处理器 ItemWriter 数据输出器 job 监听器 reader 监听器 writer 监听器 process 数据校验器
|
对于一个新的业务场景,从csv文件读取数据转换到数据库表读取数据,我们重新新建的有:
- 数据读取器:原先使用的是
FlatFileItemReader,我们现在改为使用 MyBatisCursorItemReader - 数据处理器:新的场景,业务为了好扩展,所以我们处理器最好也新建一个
- 数据输出器:新的场景,业务为了好扩展,所以我们数据输出器最好也新建一个
- step的绑定设置:新的场景,业务为了好扩展,所以我们step最好也新建一个
- Job:当然是要重新写一个了
新的数据处理器
创建 MyItemProcessorNew.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.springframework.batch.item.validator.ValidatingItemProcessor; import org.springframework.batch.item.validator.ValidationException;
public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> { @Override public BlogInfo process(BlogInfo item) throws ValidationException {
super.process(item);
Integer authorId= Integer.valueOf(item.getBlogAuthor()); if (authorId<20000) { item.setBlogTitle("这是都是小于20000的数据"); } else if (authorId>20000 && authorId<30000){ item.setBlogTitle("这是都是小于30000但是大于20000的数据"); }else { item.setBlogTitle("旧书不厌百回读"); } return item; } }
|
新的配置组件
写在MyBatchConfig类里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
@Bean public Job myJobNew(JobBuilderFactory jobs, Step stepNew){ return jobs.get("myJobNew") .incrementer(new RunIdIncrementer()) .flow(stepNew) .end() .listener(myJobListener()) .build();
}
@Bean public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew, ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){ return stepBuilderFactory .get("stepNew") .<BlogInfo, BlogInfo>chunk(65000) .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10) .listener(new MyReadListener()) .processor(processorNew) .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2) .listener(new MyWriteListener()) .build();
}
@Bean public ItemProcessor<BlogInfo, BlogInfo> processorNew(){ MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew(); csvItemProcessor.setValidator(myBeanValidator()); return csvItemProcessor; }
@Autowired private SqlSessionFactory sqlSessionFactory;
@Bean @StepScope
public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {
System.out.println("开始查询数据库");
MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();
reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");
reader.setSqlSessionFactory(sqlSessionFactory); Map<String , Object> map = new HashMap<>();
map.put("authorId" , Integer.valueOf(authorId)); reader.setParameterValues(map); return reader; }
@Bean public ItemWriter<BlogInfo> writerNew(DataSource dataSource){ JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>()); String sql = "insert into bloginfonew "+" (blogAuthor,blogUrl,blogTitle,blogItem) " +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)"; writer.setSql(sql); writer.setDataSource(dataSource); return writer; }
|
新的触发接口
新写一个接口来执行新的这个job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Autowired SimpleJobLauncher jobLauncher;
@Autowired Job myJobNew;
@GetMapping("testJobNew") public void testJobNew(@RequestParam("authorId") String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParametersNew = new JobParametersBuilder().addLong("timeNew", System.currentTimeMillis()) .addString("authorId",authorId) .toJobParameters(); jobLauncher.run(myJobNew,jobParametersNew);
}
|
关键技术点
Chunk机制
Spring Batch 提供了事务的控制,重启,检测跳过等等机制。那么,这些东西的实现,很多都在于这个step环节的设置。
首先看到我们代码出现的第一个设置,chunk( 6500 ),Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作。
没错,对于整个step环节,就是数据的读取,处理最后到输出。这个chunk机制里,我们传入的 6500,也就是是告诉它,读取处理数据,累计达到 6500条进行一次批次处理,去执行写入操作。这个传值,是根据具体业务而定,可以是500条一次,1000条一次,也可以是20条一次,50条一次。
Retry和Skip机制
在我们大量数据处理,不管是读取或者说是写入,都肯定会涉及到一些未知或者已知因素导致某条数据失败了。那么如果说咱们啥也不设置,失败一条数据,那么我们就当作整个失败了?。显然这个太不人性,所以spring batch 提供了 retry 和 skip 两个设置(其实还有restart),通过这两个设置来人性化地解决一些数据操作失败场景。
1
| retryLimit(3).retry(Exception.class)
|
这个就是设置重试,当出现异常的时候,重试多少次。我们设置为3,也就是说当一条数据操作失败,那我们会对这条数据进行重试3次,还是失败就是当做失败了,那么我们如果有配置skip(推荐配置使用),那么这个数据失败记录就会留到给 skip 来处理。
1
| skip(Exception.class).skipLimit(2)
|
skip,跳过,也就是说我们如果设置3,那么就是可以容忍 3条数据的失败。只有达到失败数据达到3次,我们才中断这个step。对于失败的数据,我们做了相关的监听器以及异常信息记录,供与后续手动补救。
常见问题
Druid连接池问题
问题描述:使用druid连接池时,MyBatisCursorItemReader 会报数据库功能不支持的错误。
原因:MyBatisCursorItemReader,druid 数据库连接池不支持。
解决方案:
- 注释掉druid连接池 jar依赖
- yml里替换连接池配置,或者不配置其他连接池
其实我们不配置其他连接池,springboot 2.X 版本已经为我们整合了默认的连接池 HikariCP。
在Springboot2.X版本,数据库的连接池官方推荐使用HikariCP。如果不是为了druid的那些后台监控数据,sql分析等等,完全是优先使用HikariCP的。
官方的原话:
We prefer HikariCP for its performance and concurrency. If HikariCP is available, we always choose it.
翻译:
我们更喜欢 HikariCP 的性能和并发性。如果有 HikariCP,我们总是选择它。
如果我们想显式配置HikariCP,可以:
1 2 3 4 5 6 7 8
| spring: datasource: hikari: minimum-idle: 5 maximum-pool-size: 20 connection-timeout: 60000 idle-timeout: 600000 max-lifetime: 1800000
|
总结
Spring Boot 整合 Spring Batch 批处理框架,提供了完善的批处理能力:
- 完善的事务管理:通过chunk机制实现批量事务控制
- 容错机制:支持retry和skip,保证批处理任务的健壮性
- 监听器机制:可以对Job、Reader、Writer等各个环节进行监控
- 数据校验:支持JSR-303数据校验规范
- 灵活的数据源:支持文件、数据库等多种数据源
Spring Batch 适用于以下场景:
- 大批量数据迁移
- 数据清理和过滤
- 定期数据备份
- 报表数据生成
- 系统间数据同步
参考链接