记一次Flink遇到性能瓶颈

yizhihongxing

前言

这周的主要时间花在Flink上面,做了一个简单的从文本文件中读取数据,然后存入数据库的例子,能够正常的实现功能,但是遇到个问题,我有四台机器,自己搭建了一个standalone的集群,不论我把并行度设置多少,跑起来的耗时都非常接近,实在是百思不得其解。机器多似乎并不能帮助它。 把过程记录在此,看后面随着学习的深入能不能解答出这个问题。
image

尝试过的修复方法

集群搭建

出现这个问题后,我从集群的角度来进行了些修改,
1,机器是2核的,slots被设置成了6,那我就有点怀疑是这个设置问题,因为其实只有2核,设置的多了,反而存在抢占资源,导致运行达不到效果,改成2后效果一样,没有改进。这个参数在
taskmanager.numberOfTaskSlots: 2
2,调整内存, taskmanager 从2G调整为4G, 效果也没有变化。
taskmanager.memory.process.size: 4000m
这里说下这个内存,我们设置的是总的Memory,也就是这个Total Process Memory。
image
剔除掉些比较固定的Memory,剩下的大头就是这个Task Heap 和 Managed Memory。
所以我们调整大小后,它两个也就相应的增加了。 我查了下这两个,可以理解为堆内存和堆外内存,
一个是存放我们程序的对象,会被垃圾回收器回收;一个是堆外内存,比如RockDB 和 缓存 sort,hash 等的中间结果。

程序方面修改

最开始的时候我把保存数据库操作写在MapFunction里面,后来改到SinkFunction里面。
SinkFunction里面保存数据库的方法也进行了反复修改,从开始使用Spring的JdbcTemplate,换成后来直接使用最原始JDBC。 而且还踩了一个坑,开始的时候用的注入的JdbcTemplate, 本地运行没有问题,到了集群上面,发到别的机器的时候,注入的东西就是空的了。
换成原始的JDBC速度能提升不少, 我猜想这里的原因是jdbctemplate做了些多余的事情, JDBC打开一次,后面Invoke的时候就直接存了,效率要高些,所以速度上提升不少。
这里把部分代码贴出来, 在Open的时候就预加载好PreparedStatement, Invoke的时候直接传参数,调用就可以了。

public class SinkToMySQL2 extends RichSinkFunction<MarketPrice> {
    private PreparedStatement updatePS;
    private PreparedStatement insertPS;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        HikariDataSource dataSource = new HikariDataSource();
        connection = getConnection(dataSource);
        if(connection != null)
        {
            String updateSQL = " update MarketPrice set open_price=?,high_price=?,low_price=?,close_price=? where performance_id = ? and price_as_of_date = ?";
            updatePS = this.connection.prepareStatement(updateSQL);

            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
        if (updatePS != null) {
            updatePS.close();
        }
        if (insertPS != null) {
            insertPS.close();
        }
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }

    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param marketPrice
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MarketPrice marketPrice, Context context) throws Exception {

        log.info("start save for {}", marketPrice.getPerformanceId().toString() );

        updatePS.setDouble(1,marketPrice.getOpenPrice());
        updatePS.setDouble(2,marketPrice.getHighPrice());
        updatePS.setDouble(3,marketPrice.getLowPrice());
        updatePS.setDouble(4,marketPrice.getClosePrice());
        updatePS.setString(5, marketPrice.getPerformanceId().toString());
        updatePS.setInt(6, marketPrice.getPriceAsOfDate());
        int result = updatePS.executeUpdate();


        log.info("finish update for {} result {}", marketPrice.getPerformanceId().toString(), result);

        if(result == 0)
        {
            String insertSQL = " insert into MarketPrice(performance_id,price_as_of_date,open_price,high_price,low_price,close_price) values (?,?,?,?,?,?)";
            insertPS = this.connection.prepareStatement(insertSQL);
            insertPS.setString(1, marketPrice.getPerformanceId().toString());
            insertPS.setInt(2, marketPrice.getPriceAsOfDate());
            insertPS.setDouble(3,marketPrice.getOpenPrice());
            insertPS.setDouble(4,marketPrice.getHighPrice());
            insertPS.setDouble(5,marketPrice.getLowPrice());
            insertPS.setDouble(6,marketPrice.getClosePrice());

            result = insertPS.executeUpdate();
            log.info("finish save for {} result {}", marketPrice.getPerformanceId().toString(), result);
        }
    }

}

总结

从多个方面去改进,结果发现还是一样的,就是使用一台机器和使用三台机器,时间上一样的,再怀疑我只能怀疑是某台机器有问题,然后运行的时候,由最慢的机器决定了速度。 我在使用MapFunction的时候有观察到,有的时候,某台机器已经处理上千条,而有的只处理了几十条,到最后完成的时候,大家处理的数量又是很接近的。这样能够解释为什么机器多了,速度却是一样的。但是我没有办法找出哪台机器来。 我自己的本地运行,并行数设置的多,速度上面是有提升的,到了集群就碰到这样的现象,后面看能不能解决它, 先记录在此。

原文链接:https://www.cnblogs.com/dk168/p/17322082.html

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:记一次Flink遇到性能瓶颈 - Python技术站

(0)
上一篇 2023年4月17日
下一篇 2023年4月17日

相关文章

  • Java中注解的工作原理

    下面是Java中注解的工作原理的完整攻略。 什么是Java注解 Java注解是一种元数据机制,其本质是为了给Java程序提供更好的描述、配置和使用方式的一种注解技术。注解可以被限定用于某些类型、方法、字段或方法参数等Java程序中的特定部分,通过注解可以传递一定的元数据信息,例如对应的某个方法的功能、某个属性的值或某个参数的约束等等。 注解在Java程序中的…

    Java 2023年5月20日
    00
  • JDBC下Idea添加mysql-jar包的详细过程

    JDBC是Java语言操作关系型数据库的标准API,目前已经成为了Java中最流行的访问数据库的方式之一,因此在开发Java应用程序时,经常需要使用JDBC操作数据库。 而在使用Idea开发Java应用程序时,需要添加mysql-jar包才能够操作MySQL数据库。以下是JDBC下Idea添加mysql-jar包的详细过程: 下载mysql-jar包 首先,…

    Java 2023年6月16日
    00
  • Spring Data JPA实现审计功能过程详解

    Spring Data JPA实现审计功能过程详解 Spring Data JPA是Spring Data家族中的一员,是对JPA的封装和增强,大大简化了开发中JPA的使用。其中,Spring Data JPA提供了审计功能,帮助我们记录实体对象的新增、修改、删除操作的时间和操作人。本文就来详细讲解Spring Data JPA如何实现审计功能。 什么是审计…

    Java 2023年6月2日
    00
  • Spring 异步接口返回结果的四种方式

    下面详细讲解Spring异步接口返回结果的四种方式。 1. 使用Callable Spring提供了一个非常简洁的方式来处理异步请求,即使用Java 5中引入的Callable接口。可以使用返回Callable的Controller方法来处理异步请求,Spring会将Callable提交到任务执行器中执行,然后将结果写入响应体中。 示例代码: @RestCo…

    Java 2023年5月31日
    00
  • 这么优雅的Java ORM没见过吧!

    首先,我们需要了解Java ORM的概念。ORM(Object Relational Mapping)是指对象关系映射,是一种将面向对象的程序与关系型数据库之间进行数据转换的技术。Java中有很多ORM框架,如Hibernate、MyBatis、JPA等,它们可以帮助开发者更加方便、高效地访问数据库。 接下来,我们来了解一款优雅的Java ORM框架——Jo…

    Java 2023年5月20日
    00
  • Spring Boot+微信小程序开发平台保存微信登录者的个人信息

    这里提供一份完整的“Spring Boot + 微信小程序开发平台保存微信登录者的个人信息”的攻略,下面将分为以下几个方面进行讲解。 1. 小程序登录流程 在小程序中,用户登录的流程如下: 用户进入小程序,点击登录按钮。 微信端会弹出授权窗口,提示用户是否授权小程序登录。 用户点击同意授权后,微信将会返回一个 code 值给小程序端。 小程序端通过 code…

    Java 2023年6月3日
    00
  • Java基础篇_有关接口和抽象类的几道练习题(分享)

    这里是Java基础篇_有关接口和抽象类的几道练习题(分享)的完整攻略。 一、介绍 该篇文章主要介绍了Java中接口和抽象类的使用方法及练习题。通过练习题的实例,让读者更好的掌握接口和抽象类的编写方法和应用场景。 二、接口 1. 接口的定义 接口是一个抽象的概念,是一组方法的集合。在Java中,接口的定义使用关键字interface来表示。 public in…

    Java 2023年5月26日
    00
  • springsecurity 基本使用详解

    下面我来详细讲解一下“springsecurity 基本使用详解”的完整攻略。 Spring Security 基本使用详解 什么是 Spring Security Spring Security 是针对 Spring 框架的安全性认证框架。也是 Spring Boot 应用中最常用的安全框架之一。它提供了全面的安全性解决方案,以保护应用程序的各个方面,从身…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部