详解Spring Batch 轻量级批处理框架实践
什么是Spring Batch?
Spring Batch 是一个轻量级的批处理框架,实现了大规模数据处理任务的管理。它提供了一个可以配置的批处理环境,这使得开发者可以非常容易地编写处理大量数据的作业。
Spring Batch 核心概念
Spring Batch 包含三个核心概念:
- 任务(Job):批处理的一个运行实例,包含多个步骤;
- 任务步骤(Step):Job 中的每个独立步骤,通常包含读取数据、处理数据、写入数据;
- 读取 - 处理 - 写入模式:每个步骤的核心流程。
Spring Batch 实践攻略
- 配置 Spring Batch 运行环境
首先我们需要配置 Spring Batch 运行环境。我们需要添加以下 Maven 依赖:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
使用 Spring Boot 就会自动配置所需的环境。
- 创建 Job 和 Step
我们在 Spring Batch 中需要创建 Job 和 Step。Job 可以包含多个 Step。每个 Step 可以完成读取、处理、写入等任务。我们需要在配置类中定义 Job 和 Step:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private ItemReader<SourceData> reader;
@Autowired
private ItemProcessor<SourceData, TargetData> processor;
@Autowired
private ItemWriter<TargetData> writer;
@Bean
public Job demoJob() {
return jobBuilderFactory.get("demoJob")
.incrementer(new RunIdIncrementer())
.flow(demoStep())
.end()
.build();
}
@Bean
public Step demoStep() {
return stepBuilderFactory.get("demoStep")
.<SourceData, TargetData>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
在示例中,我们定义了 Job 和 Step,并通过使用 @Autowired
自动装配了需要使用的 Item 类型。其中,Job 的起始任务是通过 flow()
方法来定义的。
- 实现 ItemReader
在 Spring Batch 中,ItemReader 用于读取数据,可以从文件或数据库中读取。我们需要实现一个 ItemReader 来读取数据。以下是一个示例:
@Component
public class CsvReader implements ItemReader<SourceData> {
private final String filepath = "src/main/resources/data.csv";
private BufferedReader reader;
@PostConstruct
private void init() throws Exception {
File file = new File(filepath);
if (file.exists() && file.isFile()) {
reader = new BufferedReader(new FileReader(file));
} else {
throw new Exception("File not found.");
}
}
@Override
public SourceData read() throws Exception {
String line = reader.readLine();
if (line != null && !line.trim().isEmpty()) {
String[] values = line.split(",");
return new SourceData(values[0], values[1], values[2]);
}
return null;
}
}
在示例中,我们使用 BufferedReader 读取文件,并将其拆分为 SourceData 类型的对象。
- 实现 ItemProcessor
ItemProcessor 用于处理数据,在处理过程中可以修改数据。以下是一个示例:
@Component
public class CsvProcessor implements ItemProcessor<SourceData, TargetData> {
private final String[] HEADERS = {"id", "name", "age"};
@Override
public TargetData process(SourceData sourceData) throws Exception {
if (sourceData == null) return null;
String[] arr = {sourceData.getId(), sourceData.getName(), sourceData.getAge()};
return new TargetData(HEADERS, arr);
}
}
在示例中,我们将 SourceData 转换为 TargetData 对象。
- 实现 ItemWriter
ItemWriter 用于写入数据,可以将数据写入文件或数据库中。以下是一个示例:
@Component
public class CsvWriter implements ItemWriter<TargetData> {
private final String filepath = "src/main/resources/output.csv";
private CSVPrinter printer;
@PostConstruct
private void init() throws Exception {
File file = new File(filepath);
if (!file.exists() || !file.isFile()) {
file.createNewFile();
}
CSVFormat format = CSVFormat.DEFAULT.withHeader();
FileWriter writer = new FileWriter(file, true);
printer = new CSVPrinter(writer, format);
}
@Override
public synchronized void write(List<? extends TargetData> items) throws Exception {
for (TargetData item : items) {
printer.printRecord(item.getArr());
}
printer.flush();
}
}
在示例中,我们使用 CSVPrinter 将数据写入文件中。
示例说明
在以上示例中,我们演示了读取 CSV 文件的过程,然后将其转换为 TargetData 类型,并将其写入 CSV 文件。在具体实践中,开发者可以自行定义 ItemReader、ItemProcessor 和 ItemWriter 来满足不同的需求,例如从数据库中读取数据、对数据进行筛选、将数据写入数据库等。
另外一个示例是,我们可以使用 Spring Batch 从数据库中读取数据。我们可以定义一个 ItemReader 和一个 ItemWriter 来实现以上功能:
@Component
public class UserDbReader implements ItemReader<User> {
@Autowired
private UserRepository userRepository;
@Override
public User read() throws Exception {
List<User> list = userRepository.findAll();
if (list == null || list.isEmpty()) {
return null;
}
return list.remove(0);
}
}
@Component
public class UserDbWriter implements ItemWriter<User> {
@Autowired
private UserRepository userRepository;
@Override
public void write(List<? extends User> items) throws Exception {
for (User item : items) {
userRepository.save(item);
}
}
}
在示例中,我们使用 UserRepository 从数据库中读取 User 数据,并使用 UserRepository 写入 User 数据。
总结
Spring Batch 提供了非常便捷的批处理环境,帮助开发者非常容易地实现大规模数据处理任务。在具体实践中,我们需要使用 Spring Batch 核心概念,定义 Job 和 Step,并实现 ItemReader、ItemProcessor 和 ItemWriter。在以上示例中,我们演示了读取文件和从数据库中读取数据的过程,并将其写入文件或数据库。开发者可以在自己的项目中灵活地使用 Spring Batch 来满足不同的需求。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:详解Spring Batch 轻量级批处理框架实践 - Python技术站