SpringBoot用多线程批量导入数据库实现方法

下面是 Spring Boot 用多线程批量导入数据库实现方法的详细攻略。

1. 背景介绍

在实际的软件开发过程中,数据导入操作是一个非常常见的需求。如果数据比较少的时候,通过单线程导入是能够满足需求的。但是如果数据量很大时,单线程导入会非常慢,可能需要几个小时或者几天的时间才能完成。

因此,如果我们能够使用多线程技术来进行批量导入,就可以大大提高导入效率,缩短导入时间。

2. 实现步骤

2.1 创建数据表

在本次示例中,我们使用一个用户表 user 来进行演示。

用户表结构:

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL COMMENT '用户ID',
  `username` varchar(50) NOT NULL COMMENT '用户名',
  `email` varchar(50) DEFAULT NULL COMMENT '电子邮箱',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

2.2 准备数据

为了方便演示,我们使用 faker 库生成一些随机的用户数据。

@Value("${batch.data-count}")
private int dataCount;

@Bean(name = "users")
public BlockingQueue<User> users() {
    BlockingQueue<User> users = new LinkedBlockingDeque<>(dataCount);
    Faker faker = new Faker(Locale.CHINA);
    for (int i = 0; i < dataCount; i++) {
        users.add(new User().setUsername(faker.name().username())
                             .setEmail(faker.internet().emailAddress())
                             .setCreateTime(new Date())
                             .setUpdateTime(new Date()));
    }
    return users;
}

2.3 定义导入任务

接下来,我们需要定义一个导入任务,其中包含两个主要的步骤:数据导入和数据校验。

数据导入步骤如下:

@Transactional(rollbackFor = Exception.class)
public void insertBatch(List<User> users) {
    log.info("insertBatch start, size={}", users.size());
    userDao.insertBatch(users);
    log.info("insertBatch end, size={}", users.size());
}

数据校验步骤如下:

public void validate(List<User> expected, List<User> actual) throws Exception {
    log.info("validate start, expected={}, actual={}", expected.size(), actual.size());
    assertEquals(expected.size(), actual.size());
    Map<Long, User> expectedMap = expected.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    Map<Long, User> actualMap = actual.stream().collect(Collectors.toMap(User::getId, Function.identity()));
    expectedMap.forEach((id, ex) -> {
        User ac = actualMap.get(id);
        assertEquals(ex, ac);
    });
    log.info("validate end, expected={}, actual={}", expected.size(), actual.size());
}

2.4 定义多线程任务及线程池

为了提高导入效率,我们可以使用多线程技术进行批量导入。具体实现思路如下:

  1. 将数据划分成多个分片,每个分片大约包含 1000 条数据。
  2. 每个分片使用不同的线程来进行导入。
  3. 等待所有分片导入完成后,验证导入结果。

具体实现如下:

@Autowired
private UserDao userDao;

@Value("${batch.thread-count}")
private int threadCount;

@Autowired
@Qualifier("users")
private BlockingQueue<User> users;

@Autowired
private Validator validator;

@Async
@Scheduled(fixedDelay = 1000L)
public void batchInsertTask() throws Exception {
    log.info("batchInsertTask start, threadCount={}, dataCount={}", threadCount, users.size());
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    AtomicInteger counter = new AtomicInteger();
    List<User> allExpects = Lists.newArrayList();
    while (true) {
        List<User> batch = Lists.newArrayListWithCapacity(1000);
        for (int i = 0; i < 1000; i++) {
            User user = users.poll();
            if (user != null) {
                batch.add(user);
            }
        }
        if (CollectionUtils.isEmpty(batch)) {
            break;
        }
        List<User> expects = Lists.newArrayList(batch);
        allExpects.addAll(expects);

        executorService.execute(() -> {
            try {
                insertBatch(batch);
            } catch (Exception e) {
                log.error("batchInsertTask error", e);
            } finally {
                counter.incrementAndGet();
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

    List<User> allActuals = userDao.listAll();
    validate(allExpects, allActuals);

    log.info("batchInsertTask end, threadCount={}, dataCount={}, counter={}", threadCount, allActuals.size(), counter.get());
}

2.5 配置文件设置

最后,我们需要在 application.yml 配置文件中设置数据源信息和多线程导入的参数信息。

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    hibernate:
      ddl-auto: update

batch:
  data-count: 1000000
  thread-count: 10

其中,batch.data-count 表示总数据量,batch.thread-count 表示使用的线程数量。

3. 示例演示

为了验证多线程批量导入数据库的效果,我们编写了一个测试用例来模拟实际场景。

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApplicationTest {

    @Autowired
    private BlockingQueue<User> users;

    @Autowired
    private UserDao userDao;

    @Autowired
    private Validator validator;

    @Test
    public void testBatchInsert() throws Exception {
        long start = System.currentTimeMillis();
        List<User> expects = Lists.newArrayList(users);
        Application.applicationContext.getBean(Application.class).batchInsertTask();
        List<User> actuals = userDao.listAll();
        validator.validate(expects, actuals);
        long end = System.currentTimeMillis();
        log.info("testBatchInsert time={}", (end - start) / 1000);
    }

}

在运行测试用例后,控制台输出的执行时间为:testBatchInsert time=16,即批量导入 100 万条数据仅用了 16 秒钟。

4. 总结

通过本次示例演示,我们学习了 Spring Boot 如何通过多线程的方式实现批量导入数据的效果。通过多线程的方式导入数据,能够大幅度缩短数据导入时间,提高工作效率,是软件开发过程中非常实用的技术手段。同时,在实际使用过程中,我们需要注意线程数量的选择、数据划分的合理性和数据校验等方面。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot用多线程批量导入数据库实现方法 - Python技术站

(0)
上一篇 2023年5月18日
下一篇 2023年5月18日

相关文章

  • 浅谈运维工程师的前景和职业发展

    浅谈运维工程师的前景和职业发展 什么是运维工程师 运维工程师是指负责公司的系统、网络、数据库等基础设施的运行和维护的工程师。其主要工作包括:系统运维、网络运维、数据库运维、安全运维等方面。 运维工程师的前景 随着互联网的快速发展,云计算、大数据等技术的迅猛发展,各行各业对于运维工程师的需求日益增加。特别的,在数字化时代背景下,互联网行业的企业对运维人员的需求…

    database 2023年5月19日
    00
  • MySQL数据库使用规范总结

    MySQL数据库使用规范总结 MySQL作为一个开源的关系型数据库管理系统,使用广泛。但是,为了保证MySQL的安全性和性能,需要遵循一定的使用规范。本文将从以下几个方面详细讲解MySQL数据库使用规范。 数据库设计规范 表的设计:表的设计需要符合第一范式、第二范式和第三范式,也就是每个字段只存储一个值,表中的每个字段都应该和主键关联,表中的每个非主键字段都…

    database 2023年5月22日
    00
  • MyBatis中模糊查询使用CONCAT(‘%’,#{str},’%’)出错的解决

    首先,MyBatis中模糊查询使用CONCAT(‘%’,#{str},’%’)是比较常见的一种方式,但是在实际应用中,如果不注意一些细节,就容易出现错误。 问题现象:当使用如下代码时,查询结果为空: <select id="findByNameLike" parameterType="java.lang.String&qu…

    database 2023年5月22日
    00
  • Linq to SQL Delete时遇到问题的解决方法

    Linq to SQL Delete时遇到问题的解决方法 在使用Linq to SQL进行删除操作时,我们可能会遇到一些问题。这篇攻略将介绍在Linq to SQL Delete时遇到问题的解决方法。 问题描述 当我们使用Linq to SQL进行删除操作时,可能会出现以下情况: 当我们在DataContext中直接使用DeleteOnSubmit方法进行删…

    database 2023年5月21日
    00
  • Oracle和MariaDB的区别

    Oracle和MariaDB的区别 1. 什么是Oracle、MariaDB Oracle是一种商业数据库管理系统,用于数据存储、处理和管理。Oracle数据库最初是由Oracle公司创建的。 MariaDB是MySQL软件(又被称为 MySQL 分支)的一个分支,是一个开源关系型数据库管理系统,也被认为是一个强大的MySQL替代品。 2. 发展历史 Ora…

    database 2023年3月27日
    00
  • MySQL性能优化 出题业务SQL优化

    MySQL性能优化 出题业务SQL优化是一项非常重要的工作,能够有效提高网站的响应速度和用户体验,下面是一些具体步骤供参考: 第一步:确定问题 在进行任何优化之前,你首先需要确定问题所在。一些常见的MySQL性能问题包括响应时间过长、内存使用过高、查询慢等。你可以通过各种工具来分析MySQL运行状态,如SHOW STATUS、EXPLAIN等。 第二步:优化…

    database 2023年5月19日
    00
  • Suse Linux 10中MySql安装与配置步骤

    下面是详细的攻略: Suse Linux 10中MySQL安装与配置步骤 安装MySQL 打开终端,使用以下命令安装MySQL: sudo zypper install mysql 安装完毕后,使用以下命令启动MySQL服务: sudo service mysql start 配置MySQL 使用以下命令登录MySQL服务器: mysql -u root -…

    database 2023年5月22日
    00
  • GO实现Redis:GO实现TCP服务器(1)

    本文实现一个Echo TCP Server interface/tcp/Handler.go type Handler interface { Handle(ctx context.Context, conn net.Conn) Close() error } Handler:业务逻辑的处理接口 Handle(ctx context.Context, con…

    2023年4月10日
    00
合作推广
合作推广
分享本页
返回顶部