请允许我给您详细讲解“Spring Batch批处理框架”的完整攻略。
什么是Spring Batch?
Spring Batch是Spring官方提供的一个用于大规模处理数据任务的框架。它能够对大量数据进行统一标准化集中处理,适用于许多任务,如大批量数据的ETL(Extract-Transform-Load),数据清理,报表生成等。基于Spring Batch,我们可以开发大规模数据处理的应用程序。
Spring Batch的核心概念
Job
Job代表批处理任务,它由一系列的Step组成。我们可以通过Job来控制整个批处理任务的执行,例如启动一个新的Job实例、设置Job参数、监控Job执行情况等。
Step
一个Job可以包含多个Step,每个Step代表批处理任务的一部分,比如数据的读取、数据处理、数据输出等。Step包含一个ItemReader、一个ItemProcessor和一个ItemWriter,它们是Step的三个核心组件。
ItemReader
ItemReader负责从数据源中读取数据,一次读取一条数据。Spring Batch提供了一些内置的ItemReader实现,比如FlatFileItemReader,JdbcCursorItemReader等。
ItemProcessor
ItemProcessor负责对读取到的数据进行处理,实现数据的转换、清理、合并等操作。ItemProcessor是可选的组件,如果不需要对数据进行处理,可以省略该组件。
ItemWriter
ItemWriter负责将处理后的数据输出到目标数据源中。Spring Batch提供了一些内置的ItemWriter实现,比如FlatFileItemWriter,JdbcBatchItemWriter等。
Spring Batch的使用步骤
第一步:创建Job
创建Job需要实现Job接口并实现其execute方法。在execute方法中定义要执行的Step和执行顺序。示例如下:
@Configuration
@EnableBatchProcessing
public class MyJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
public MyJobConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(new PersonItemReader())
.processor(new PersonItemProcessor())
.writer(new PersonItemWriter())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<String, String>chunk(10)
.reader(new MessageItemReader())
.processor(new MessageItemProcessor())
.writer(new MessageItemWriter())
.build();
}
}
第二步:定义ItemReader
定义ItemReader需要实现ItemReader接口并重写其read方法。示例如下:
public class PersonItemReader implements ItemReader<Person> {
private List<Person> persons = new ArrayList<>();
public PersonItemReader() {
persons.add(new Person("张三", 20));
persons.add(new Person("李四", 30));
persons.add(new Person("王五", 40));
}
@Override
public Person read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!persons.isEmpty()) {
return persons.remove(0);
}
return null;
}
}
第三步:定义ItemProcessor
定义ItemProcessor需要实现ItemProcessor接口并重写其process方法。示例如下:
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Override
public Person process(Person person) throws Exception {
person.setAge(person.getAge() + 10);
return person;
}
}
第四步:定义ItemWriter
定义ItemWriter需要实现ItemWriter接口并重写其write方法。示例如下:
public class PersonItemWriter implements ItemWriter<Person> {
@Override
public void write(List<? extends Person> persons) throws Exception {
for (Person person : persons) {
System.out.println(person.getName() + " " + person.getAge());
}
}
}
第五步:启动批处理任务
可以通过JobLauncher的run方法启动批处理任务。示例如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyJobConfigTest {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job myJob;
@Test
public void testJob() throws Exception {
jobLauncher.run(myJob, new JobParameters());
}
}
以上就是使用Spring Batch的基本流程。
示例1:从文件中读取并处理数据
第一步:定义ItemReader
定义一个能够从文件中读取数据的ItemReader。示例如下:
public class FileItemReader implements ItemReader<String> {
private final String path;
private BufferedReader bufferedReader;
private boolean exhausted;
public FileItemReader(String path) {
this.path = path;
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (bufferedReader == null) {
bufferedReader = new BufferedReader(new FileReader(path));
}
String line = bufferedReader.readLine();
if (line == null) {
exhausted = true;
}
return line;
}
@PreDestroy
public void close() throws IOException {
if (bufferedReader != null) {
bufferedReader.close();
}
}
public boolean isExhausted() {
return exhausted;
}
}
第二步:定义ItemProcessor
定义一个能够处理读取到的数据的ItemProcessor。示例如下:
public class UpperCaseItemProcessor implements ItemProcessor<String, String> {
@Override
public String process(String s) throws Exception {
return s.toUpperCase();
}
}
第三步:定义ItemWriter
定义一个能够将处理后的数据写入文件的ItemWriter。示例如下:
public class FileItemWriter implements ItemWriter<String> {
private final String path;
private BufferedWriter bufferedWriter;
public FileItemWriter(String path) {
this.path = path;
}
@Override
public void write(List<? extends String> list) throws Exception {
if (bufferedWriter == null) {
bufferedWriter = new BufferedWriter(new FileWriter(path));
}
for (String s : list) {
bufferedWriter.write(s);
bufferedWriter.newLine();
}
bufferedWriter.flush();
}
@PreDestroy
public void close() throws IOException {
if (bufferedWriter != null) {
bufferedWriter.close();
}
}
}
第四步:创建Step
创建一个包含FileItemReader、UpperCaseItemProcessor和FileItemWriter的Step。示例如下:
@Bean
public Step fileToUpperCaseStep() {
return stepBuilderFactory.get("fileToUpperCaseStep")
.<String, String>chunk(10)
.reader(new FileItemReader("input.txt"))
.processor(new UpperCaseItemProcessor())
.writer(new FileItemWriter("output.txt"))
.build();
}
第五步:创建Job并启动批处理任务
创建一个Job并包含刚才创建的Step,然后启动批处理任务。示例如下:
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.start(fileToUpperCaseStep())
.build();
}
@Test
public void testFileToUpperCaseJob() throws Exception {
JobExecution jobExecution = jobLauncher.run(myJob(), new JobParameters());
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
执行完毕后,output.txt文件中的文本将全部变成大写。
示例2:从数据库中读取并处理数据
第一步:定义ItemReader
定义一个能够从JDBC数据源中读取数据的ItemReader。示例如下:
@Bean
public JdbcCursorItemReader<Customer> customerItemReader() {
JdbcCursorItemReader<Customer> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT id, name, age FROM customer");
reader.setRowMapper((resultSet, i) -> new Customer(
resultSet.getInt("id"),
resultSet.getString("name"),
resultSet.getInt("age")));
return reader;
}
第二步:定义ItemProcessor
定义一个能够处理读取到的数据的ItemProcessor。示例如下:
@Bean
public ItemProcessor<Customer, Customer> averageAgeItemProcessor() {
return item -> {
int age = item.getAge();
if (age >= 0 && age <= 17) {
item.setAgeRange("0-17");
} else if (age >= 18 && age <= 29) {
item.setAgeRange("18-29");
} else if (age >= 30 && age <= 39) {
item.setAgeRange("30-39");
} else if (age >= 40 && age <= 49) {
item.setAgeRange("40-49");
} else {
item.setAgeRange("50+");
}
return item;
};
}
第三步:定义ItemWriter
定义一个能够将处理后的数据写入到另一个数据库表中的ItemWriter。示例如下:
@Bean
public ItemWriter<Customer> customerItemWriter() {
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO customer_processed (id, name, age, age_range) VALUES (:id, :name, :age, :ageRange)");
return writer;
}
第四步:创建Step
创建一个包含customerItemReader、averageAgeItemProcessor和customerItemWriter的Step。示例如下:
@Bean
public Step customerProcessingStep() {
return stepBuilderFactory.get("customerProcessingStep")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.processor(averageAgeItemProcessor())
.writer(customerItemWriter())
.build();
}
第五步:创建Job并启动批处理任务
创建一个Job并包含刚才创建的Step,然后启动批处理任务。示例如下:
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.start(customerProcessingStep())
.build();
}
@Test
public void testCustomerProcessingJob() throws Exception {
JobExecution jobExecution = jobLauncher.run(myJob(), new JobParameters());
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
执行完毕后,数据将被处理并写入到customer_processed表中。
以上就是关于Spring Batch批处理框架的完整攻略以及两个示例的介绍,感谢您的耐心阅读。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spring batch批处理框架 - Python技术站