SpringBoot用多线程批量导入数据库实现方法

yizhihongxing

下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。

1. 背景介绍

在实际的软件开发过程中,数据导入操作是一个非常常见的需求。如果数据比较少的时候,通过单线程导入是能够满足需求的。但是如果数据量很大时,单线程导入会非常慢,可能需要几个小时或者几天的时间才能完成。

因此,如果我们能够使用多线程技术来进行批量导入,就可以大大提高导入效率,缩短导入时间。

2. 实现步骤

2.1 创建数据表

在本次示例中,我们使用一个用户表 user 来进行演示。

用户表结构:

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL COMMENT '用户ID',
  `username` varchar(50) NOT NULL COMMENT '用户名',
  `email` varchar(50) DEFAULT NULL COMMENT '电子邮箱',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

2.2 准备数据

为了方便演示,我们使用 faker 库生成一些随机的用户数据。

@Value("${batch.data-count}")
private int dataCount;

@Bean(name = "users")
public BlockingQueue<User> users() {
    BlockingQueue<User> users = new LinkedBlockingDeque<>(dataCount);
    Faker faker = new Faker(Locale.CHINA);
    for (int i = 0; i < dataCount; i++) {
        users.add(new User().setUsername(faker.name().username())
                             .setEmail(faker.internet().emailAddress())
                             .setCreateTime(new Date())
                             .setUpdateTime(new Date()));
    }
    return users;
}

2.3 定义导入任务

接下来,我们需要定义一个导入任务,其中包含两个主要的步骤:数据导入和数据校验。

数据导入步骤如下:

@Transactional(rollbackFor = Exception.class)
public void insertBatch(List<User> users) {
    log.info("insertBatch start, size={}", users.size());
    userDao.insertBatch(users);
    log.info("insertBatch end, size={}", users.size());
}

数据校验步骤如下:

public void validate(List<User> expected, List<User> actual) throws Exception {
    log.info("validate start, expected={}, actual={}", expected.size(), actual.size());
    assertEquals(expected.size(), actual.size());
    Map<Long, User> expectedMap = expected.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    Map<Long, User> actualMap = actual.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    expectedMap.forEach((id, ex) -> {
        User ac = actualMap.get(id);
        assertEquals(ex, ac);
    });
    log.info("validate end, expected={}, actual={}", expected.size(), actual.size());
}

2.4 定义多线程任务及线程池

为了提高导入效率,我们可以使用多线程技术进行批量导入。具体实现思路如下:

  1. 将数据划分成多个分片,每个分片大约包含 1000 条数据。
  2. 每个分片使用不同的线程来进行导入。
  3. 等待所有分片导入完成后,验证导入结果。

具体实现如下:

@Autowired
private UserDao userDao;

@Value("${batch.thread-count}")
private int threadCount;

@Autowired
@Qualifier("users")
private BlockingQueue<User> users;

@Autowired
private Validator validator;

@Async
@Scheduled(fixedDelay = 1000L)
public void batchInsertTask() throws Exception {
    log.info("batchInsertTask start, threadCount={}, dataCount={}", threadCount, users.size());
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    AtomicInteger counter = new AtomicInteger();
    List<User> allExpects = Lists.newArrayList();
    while (true) {
        List<User> batch = Lists.newArrayListWithCapacity(1000);
        for (int i = 0; i < 1000; i++) {
            User user = users.poll();
            if (user != null) {
                batch.add(user);
            }
        }
        if (CollectionUtils.isEmpty(batch)) {
            break;
        }
        List<User> expects = Lists.newArrayList(batch);
        allExpects.addAll(expects);

        executorService.execute(() -> {
            try {
                insertBatch(batch);
            } catch (Exception e) {
                log.error("batchInsertTask error", e);
            } finally {
                counter.incrementAndGet();
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

    List<User> allActuals = userDao.listAll();
    validate(allExpects, allActuals);

    log.info("batchInsertTask end, threadCount={}, dataCount={}, counter={}", threadCount, allActuals.size(), counter.get());
}

2.5 配置文件设置

最后,我们需要在 application.yml 配置文件中设置数据源信息和多线程导入的参数信息。

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    hibernate:
      ddl-auto: update

batch:
  data-count: 1000000
  thread-count: 10

其中,batch.data-count 表示总数据量,batch.thread-count 表示使用的线程数量。

3. 示例演示

为了验证多线程批量导入数据库的效果,我们编写了一个测试用例来模拟实际场景。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApplicationTest {

    @Autowired
    private BlockingQueue<User> users;

    @Autowired
    private UserDao userDao;

    @Autowired
    private Validator validator;

    @Test
    public void testBatchInsert() throws Exception {
        long start = System.currentTimeMillis();
        List<User> expects = Lists.newArrayList(users);
        Application.applicationContext.getBean(Application.class).batchInsertTask();
        List<User> actuals = userDao.listAll();
        validator.validate(expects, actuals);
        long end = System.currentTimeMillis();
        log.info("testBatchInsert time={}", (end - start) / 1000);
    }

}

在运行测试用例后,控制台输出的执行时间为:testBatchInsert time=16,即批量导入 100 万条数据仅用了 16 秒钟。

4. 总结

通过本次示例演示,我们学习了 Spring Boot 如何通过多线程的方式实现批量导入数据的效果。通过多线程的方式导入数据,能够大幅度缩短数据导入时间,提高工作效率,是软件开发过程中非常实用的技术手段。同时,在实际使用过程中,我们需要注意线程数量的选择、数据划分的合理性和数据校验等方面。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot用多线程批量导入数据库实现方法 - Python技术站

(0)
上一篇 2023年5月18日
下一篇 2023年5月18日

相关文章

  • oracle中利用关键字rownum查询前20名员工信息及rownum用法

    关键字ROWNUM是Oracle中非常常用的一种方式来限定查询的结果集,它可以返回一个表中满足某种条件的前N行,我们可以利用它来查询前N名员工信息。下面是具体的步骤: 创建员工表 我们先创建一个员工表,表中包含员工编号、姓名、所属部门、工资等字段,以便后续查询操作。创建表的SQL语句如下: CREATE TABLE employee( emp_id NUMB…

    database 2023年5月21日
    00
  • Redis批量删除Key的三种方式小结

    下面是关于 Redis 批量删除 Key 的三种方式的详细讲解。 方式一:使用命令行删除 第一种方式是通过 Redis 命令行执行删除操作。我们可以使用 keys 命令来查找所有符合特定模式的 Key,然后使用 del 命令批量删除这些 Key。例如: redis> keys user:* 1) "user:1" 2) "…

    database 2023年5月22日
    00
  • MongoDB查询文档使用方法(详解版)

    MongoDB是一款NoSQL数据库,使用它进行查询文档与关系型数据库有较大的区别,下文将带大家了解MongoDB查询文档的完整方法。 首先,我们需要安装MongoDB,接着选择一种适合自己的编程语言,这里选择Python为例。 连接MongoDB 连接MongoDB需要用到pymongo库,如果您还没安装,可以通过以下命令进行安装: $ pip3 inst…

    MongoDB 2023年3月14日
    00
  • spring boot项目application.properties文件存放及使用介绍

    介绍 application.properties是SpringBoot项目中常用的一种配置文件,可以用来定义项目的各种属性值,其中包括:数据库链接信息、各种组件的属性以及其他一些自定义属性值等等。本文将对application.properties的存放位置、使用方法以及示例进行详细的介绍。 存放位置 在一个SpringBoot项目中,applicatio…

    database 2023年5月18日
    00
  • SQL Server数据库损坏检测以及SQL Server数据库修复的解决方法

    下面是关于SQL Server数据库损坏检测以及修复的一些完整攻略: SQL Server数据库损坏检测 方法1:运行数据库检查工具 SQL Server 有一个内置的数据库检查工具,可以帮助检测数据库文件的完整性。可以通过以下步骤运行此工具: 使用 SQL Server Management Studio 连接到相应的 SQL Server 实例。 在 O…

    database 2023年5月21日
    00
  • postgresql数据库配置文件postgresql.conf,pg_hba.conf,pg_ident.conf

    PostgreSQL是一款功能强大的关系型数据库,它的配置文件主要有postgresql.conf、pg_hba.conf以及pg_ident.conf三个。其中postgresql.conf是PostgreSQL的主要配置文件,它提供了大量可配置的选项用来控制数据库系统的行为,pg_hba.conf和pg_ident.conf则主要关注在安全性方面。 下面…

    database 2023年5月22日
    00
  • oracle锁表该如何解决

    当出现oracle锁表的情况时,我们需要尽快解决该问题,避免影响业务正常运行。下面是解决oracle锁表的完整攻略: 1.查看锁定情况 在Oracle中,我们可以通过以下两个方式查看当前锁定情况:- 使用Oracle自带的视图V$LOCKED_OBJECT查看当前被锁定的对象及锁类型 SELECT OBJECT_NAME, SESSION_ID, LOCKE…

    database 2023年5月21日
    00
  • Windows系统中完全卸载MySQL数据库实现重装mysql

    下面是完整攻略: 1. 停止MySQL服务 首先,需要停止正在运行的MySQL服务。可以在命令行窗口中输入以下命令实现停止服务: net stop mysql 2. 卸载MySQL 在控制面板中找到“程序和功能”选项,找到MySQL进行卸载。如果没有通过安装程序安装MySQL,可以直接删除MySQL的安装目录。 3. 删除MySQL相关文件 在卸载MySQL…

    database 2023年5月22日
    00
合作推广
合作推广
分享本页
返回顶部