SpringBoot用多线程批量导入数据库实现方法

下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。

1. 背景介绍

在实际的软件开发过程中,数据导入操作是一个非常常见的需求。如果数据比较少的时候,通过单线程导入是能够满足需求的。但是如果数据量很大时,单线程导入会非常慢,可能需要几个小时或者几天的时间才能完成。

因此,如果我们能够使用多线程技术来进行批量导入,就可以大大提高导入效率,缩短导入时间。

2. 实现步骤

2.1 创建数据表

在本次示例中,我们使用一个用户表 user 来进行演示。

用户表结构:

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL COMMENT '用户ID',
  `username` varchar(50) NOT NULL COMMENT '用户名',
  `email` varchar(50) DEFAULT NULL COMMENT '电子邮箱',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

2.2 准备数据

为了方便演示,我们使用 faker 库生成一些随机的用户数据。

@Value("${batch.data-count}")
private int dataCount;

@Bean(name = "users")
public BlockingQueue<User> users() {
    BlockingQueue<User> users = new LinkedBlockingDeque<>(dataCount);
    Faker faker = new Faker(Locale.CHINA);
    for (int i = 0; i < dataCount; i++) {
        users.add(new User().setUsername(faker.name().username())
                             .setEmail(faker.internet().emailAddress())
                             .setCreateTime(new Date())
                             .setUpdateTime(new Date()));
    }
    return users;
}

2.3 定义导入任务

接下来,我们需要定义一个导入任务,其中包含两个主要的步骤:数据导入和数据校验。

数据导入步骤如下:

@Transactional(rollbackFor = Exception.class)
public void insertBatch(List<User> users) {
    log.info("insertBatch start, size={}", users.size());
    userDao.insertBatch(users);
    log.info("insertBatch end, size={}", users.size());
}

数据校验步骤如下:

public void validate(List<User> expected, List<User> actual) throws Exception {
    log.info("validate start, expected={}, actual={}", expected.size(), actual.size());
    assertEquals(expected.size(), actual.size());
    Map<Long, User> expectedMap = expected.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    Map<Long, User> actualMap = actual.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    expectedMap.forEach((id, ex) -> {
        User ac = actualMap.get(id);
        assertEquals(ex, ac);
    });
    log.info("validate end, expected={}, actual={}", expected.size(), actual.size());
}

2.4 定义多线程任务及线程池

为了提高导入效率,我们可以使用多线程技术进行批量导入。具体实现思路如下:

  1. 将数据划分成多个分片,每个分片大约包含 1000 条数据。
  2. 每个分片使用不同的线程来进行导入。
  3. 等待所有分片导入完成后,验证导入结果。

具体实现如下:

@Autowired
private UserDao userDao;

@Value("${batch.thread-count}")
private int threadCount;

@Autowired
@Qualifier("users")
private BlockingQueue<User> users;

@Autowired
private Validator validator;

@Async
@Scheduled(fixedDelay = 1000L)
public void batchInsertTask() throws Exception {
    log.info("batchInsertTask start, threadCount={}, dataCount={}", threadCount, users.size());
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    AtomicInteger counter = new AtomicInteger();
    List<User> allExpects = Lists.newArrayList();
    while (true) {
        List<User> batch = Lists.newArrayListWithCapacity(1000);
        for (int i = 0; i < 1000; i++) {
            User user = users.poll();
            if (user != null) {
                batch.add(user);
            }
        }
        if (CollectionUtils.isEmpty(batch)) {
            break;
        }
        List<User> expects = Lists.newArrayList(batch);
        allExpects.addAll(expects);

        executorService.execute(() -> {
            try {
                insertBatch(batch);
            } catch (Exception e) {
                log.error("batchInsertTask error", e);
            } finally {
                counter.incrementAndGet();
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

    List<User> allActuals = userDao.listAll();
    validate(allExpects, allActuals);

    log.info("batchInsertTask end, threadCount={}, dataCount={}, counter={}", threadCount, allActuals.size(), counter.get());
}

2.5 配置文件设置

最后,我们需要在 application.yml 配置文件中设置数据源信息和多线程导入的参数信息。

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    hibernate:
      ddl-auto: update

batch:
  data-count: 1000000
  thread-count: 10

其中,batch.data-count 表示总数据量,batch.thread-count 表示使用的线程数量。

3. 示例演示

为了验证多线程批量导入数据库的效果,我们编写了一个测试用例来模拟实际场景。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApplicationTest {

    @Autowired
    private BlockingQueue<User> users;

    @Autowired
    private UserDao userDao;

    @Autowired
    private Validator validator;

    @Test
    public void testBatchInsert() throws Exception {
        long start = System.currentTimeMillis();
        List<User> expects = Lists.newArrayList(users);
        Application.applicationContext.getBean(Application.class).batchInsertTask();
        List<User> actuals = userDao.listAll();
        validator.validate(expects, actuals);
        long end = System.currentTimeMillis();
        log.info("testBatchInsert time={}", (end - start) / 1000);
    }

}

在运行测试用例后,控制台输出的执行时间为:testBatchInsert time=16,即批量导入 100 万条数据仅用了 16 秒钟。

4. 总结

通过本次示例演示,我们学习了 Spring Boot 如何通过多线程的方式实现批量导入数据的效果。通过多线程的方式导入数据,能够大幅度缩短数据导入时间,提高工作效率,是软件开发过程中非常实用的技术手段。同时,在实际使用过程中,我们需要注意线程数量的选择、数据划分的合理性和数据校验等方面。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot用多线程批量导入数据库实现方法 - Python技术站

(0)
上一篇 2023年5月18日
下一篇 2023年5月18日

相关文章

  • GraalVM native-image编译后quarkus的超音速启动

    下面是“GraalVM native-image编译后quarkus的超音速启动”的攻略。 1. 什么是GraalVM native-image? GraalVM native-image是GraalVM的一个重要特性,能够将Java应用程序编译成本地可执行文件。这样做的优势是可以极大地提高应用程序的启动速度和运行效率。 2. 为什么要使用GraalVM n…

    database 2023年5月21日
    00
  • SQL通用存储过程分页,支持多表联合

    SQL通用存储过程分页是指能够在多表联合查询时,进行通用的分页查询操作。这种分页操作可以应用于多种数据库类型,如MySQL、Oracle、SQL Server等。下面将详细讲解如何进行SQL通用存储过程分页。 1.创建存储过程 创建一个名为Paging的存储过程。在存储过程中,使用了一些重要的参数,如表名、排序列、第几页、每页行数。下面是实现代码。 CREA…

    database 2023年5月22日
    00
  • 浅谈数据库索引的作用及原理

    浅谈数据库索引的作用及原理 简介 在数据库查询时,查询的数据量可能很大,甚至可能有成千上万的记录。若没有索引,数据库查询必将使得查询的速度变得极慢,系统处理效率也会变得很低。因此,对于数据库而言,建立索引可以快速定位需要查询的数据,提高查询速度的同时还可以降低数据库的负载。 索引的定义 索引是一种数据结构,它可以提高数据的查询速度。 索引通常是一个表(或视图…

    database 2023年5月19日
    00
  • mysql自动断开该连接解决方案

    作者: MySQL 的默认设置下,当一个连接的空闲时间超过8小时后,MySQL 就会断开该连接,而 c3p0 连接池则以为该被断开的连接依然有效。在这种情况下,如果客户端代码向 c3p0 连接池请求连接的话,连接池就会把已经失效的连接返回给客户端,客户端在使用该失效连接的时候即抛出异常 复制代码代码如下: <bean /> <!–othe…

    MySQL 2023年4月13日
    00
  • 深入浅析mybatis oracle BLOB类型字段保存与读取

    深入浅析MyBatis Oracle BLOB类型字段的保存与读取 概述 在使用MyBatis操作Oracle数据库过程中,我们可能会遇到BLOB类型字段的保存和读取问题。BLOB类型字段通常用于存储大型二进制数据,比如图片、音频、视频等。如何使用MyBatis操作BLOB类型字段是一个需要仔细思考的问题。 本文将介绍如何使用MyBatis进行Oracle数…

    database 2023年5月21日
    00
  • MySQL常见的脚本语句格式参考指南

    MySQL常见的脚本语句格式参考指南 1. 为什么需要脚本语句的格式参考指南? 在编写MySQL脚本时,较为规范的脚本格式可以让脚本更易于理解、维护和修改,而不规范的脚本格式则会给别人查看、修改脚本带来更大的困难。因此,脚本的格式参考指南是非常重要的。 2. MySQL常见的脚本语句格式参考指南 2.1 语句之间的分号 在MySQL中,每个语句应该以分号(;…

    database 2023年5月21日
    00
  • Linux中设置Redis开机启动的方法

    下面我将为您详细讲解“Linux中设置Redis开机启动的方法”的完整攻略,以下是具体步骤: 1. 编写Redis启动脚本 在/etc/init.d/目录下新建一个名为redis的文件,这个文件就是我们的启动脚本,使用以下命令: sudo vim /etc/init.d/redis 然后把以下代码粘贴进去: #!/bin/sh # chkconfig: 23…

    database 2023年5月22日
    00
  • 深入分析PHP优化及注意事项

    深入分析PHP优化及注意事项 PHP是一种流行的服务器端编程语言,然而,在应用程序较大而复杂的情况下,它的性能可能会受到影响。在本篇文章中,我将介绍一些PHP优化技术和注意事项,帮助你更好地提升PHP应用程序的性能。 1. 使用OPcache OPcache是一个免费的开源PHP缓存扩展,可以在服务器端缓存并预编译PHP脚本。OPcache能够避免每次请求时…

    database 2023年5月21日
    00
合作推广
合作推广
分享本页
返回顶部