Flink JobGraph生成源码解析

下面是详细讲解“Flink JobGraph生成源码解析”的完整攻略。

什么是Flink JobGraph

Flink JobGraph是Apache Flink的一个重要模块,它描述了一个Flink任务的数据流和操作。在Flink任务启动时,JobGraph会被构建出来,并提交到JobManager进行执行。

JobGraph的生成流程

Flink JobGraph的生成流程可以分为以下几个步骤:

  1. 用户编写Flink程序代码
  2. Flink编译器将用户编写的代码转换成一个DAG(有向无环图),该DAG描述了Flink程序中的数据流和操作。
  3. Flink编译器将DAG转换成一个JobGraph对象,该对象描述了Flink任务的数据流和操作。
  4. JobGraph通过网络传输到JobManager,并由JobManager进行任务调度和执行。

其中,第2步和第3步是比较关键的,下面我们将详细解析这两个步骤的流程。

Flink程序转换成DAG

当用户编写Flink程序时,实际上是在创建数据流。Flink中的数据流可以看做是一系列的逻辑算子(Operator)组成的有向无环图(DAG)。每个算子代表一个数据处理操作,每个算子之间通过边连接,传递数据流。

下面是一个简单的Flink程序示例,该程序将输入的字符串分割成单词,并统计单词出现的次数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("localhost", 9999)
    .flatMap((String line, Collector<String> out) -> {
        for (String word: line.split(" ")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum(1)
    .print();

上述程序中,通过socketTextStream方法创建了一个数据源,flatMap方法对数据进行单词分割,keyBy方法对单词进行分组,sum方法对单词出现的次数进行统计,最终通过print方法输出到控制台。

Flink编译器在编译该程序时,会分析程序中的逻辑算子,生成对应的Operator实例,并将这些Operator连接成一个DAG,如下图所示:

socketTextStream -> flatMap -> keyBy -> sum -> print

其中,箭头表示算子之间的边。

DAG转换成JobGraph

将DAG转换成JobGraph是Flink编译器的一个重要工作,它将DAG抽象成为一个任务的执行计划。

Flink编译器在将DAG转换成JobGraph时,会对每个算子进行预处理,构建出算子的执行环境和执行节点。在每个执行节点上,Flink编译器会将算子转换成任务(Task),执行完整的序列化和反序列化等操作后,将任务加入到JobGraph中。

下面是一个完整的示例程序,它将实现对输入流的单词计数,并输出到指定文件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
    for (String word: value.split(" ")) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

JobClient jobClient = JobManagerClient.create(JobID.generate(), new Configuration(), "localhost");

jobClient.submitJob(jobGraph).get();

在上述程序中,我们使用了FlinkKafkaConsumer010从Kafka中读取数据,使用flatMap对输入数据进行单词计数,使用keyBy对单词进行分组,使用reduce对单词进行统计,最终使用writeAsText将结果输出到指定文件。

在程序的最后,我们通过调用getStreamGraph方法可以获取到整个程序的DAG图,然后使用getJobGraph方法将DAG图转换成JobGraph。最终,我们通过JobManagerClient的submitJob方法提交JobGraph到JobManager进行执行。

两个JobGraph生成的示例

下面是两个不同的Flink任务的JobGraph,它们分别来自于两个不同的Flink程序:

示例一

该示例程序也是一个单词计数程序,它使用socketTextStream读取数据,使用keyBy、sum和print进行数据处理和输出。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("localhost", 9999)
    .flatMap((String line, Collector<String> out) -> {
        for (String word: line.split(" ")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum(1)
    .print();

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

上述程序的JobGraph如下图所示:

JobGraph
├─JobVertex (source: Socket Stream -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction@3d323106
│       └─Output: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│            └─TypeInformation: org.apache.flink.api.common.typeinfo.BasicTypeInfo@13aa97a9
├─JobVertex (keyBy: STRING) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│       └─Output: T->keyBy: STRING->S: org.apache.flink.api.java.tuple.Tuple2

...

... 

上述JobGraph共包含两个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于socketTextStream方法,输出为单个字符串。第二个JobVertex即为对单词进行分组和统计的部分,它的输入来自于第一个JobVertex,输出类型为Tuple2<String, Integer>

示例二

该示例程序同样是一个单词计数程序,它使用FlinkKafkaConsumer010从Kafka中读取数据,并使用keyBy、reduce和writeAsText将结果输出到指定文件。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
    for (String word: value.split(" ")) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

上述程序的JobGraph如下图所示:

JobGraph
├─JobVertex (source: Kafka -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010@4536ed41
│       └─Output: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (reducer: Reduce) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│       └─Output: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (sink: Text File Sink) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│       └─Output: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3

...

上述JobGraph共包含三个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于FlinkKafkaConsumer010对象,输出类型为Tuple2<String, Integer>。第二个JobVertex和第一个JobVertex类似,即为对单词进行统计的部分,而第三个JobVertex为数据输出部分,将结果输出到指定文件。

总结

本文详细讲解了Flink JobGraph生成源码解析的完整攻略,包括JobGraph的生成流程、Flink程序转换成DAG、DAG转换成JobGraph等。最后,我们给出了两个Flink程序的JobGraph示例,帮助读者更好地理解JobGraph的生成过程。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink JobGraph生成源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Springboot 全局日期格式化处理的实现

    针对这个主题,以下是完整的攻略: 一、为什么需要全局日期格式化处理? 在SpringBoot的开发中,我们经常需要将数据转化为特定的格式,比如日期数据。Java的日期处理比较麻烦,结果多种多样。这时候,我们就需要一种全局的方式,将日期格式化成我们需要的格式,节省开发时间。 二、如何实现全局日期格式化处理? SpringBoot提供了多种方式实现全局日期格式化…

    Java 2023年5月20日
    00
  • Java超详细透彻讲解static

    Java超详细透彻讲解static 什么是static 在Java中,static关键字可以用来修饰变量、方法和代码块,它表示这个成员在类中只有一份,不需要创建实例就能访问。也可以说,static修饰的成员属于类本身而不属于对象。 static变量 static变量是在类中使用static关键字来修饰的变量,它是类共享的,对于该类的所有对象来说,static…

    Java 2023年5月26日
    00
  • 详解Java如何优雅的使用策略模式

    详解Java如何优雅的使用策略模式 策略模式(Strategy Pattern)属于行为型设计模式,它定义了一系列算法,将每个算法封装起来,并使它们可以互换。策略模式让算法的变化独立于使用算法的客户端,客户端通过传递不同的策略对象来使用不同的算法。 在Java里,策略模式的实现有很多种方法,接下来将说明其中一种优雅的实现方式。 1. 定义接口和实现策略 首先…

    Java 2023年5月19日
    00
  • Java中线程组ThreadGroup与线程池的区别及示例

    Java中线程池与线程组ThreadGroup的区别及示例 线程池 线程池是一种线程的管理机制,它可以重用已经创建的线程,避免重复创建、销毁线程的开销,提高系统的效率。Java中通过java.util.concurrent.Executor提供了线程池的支持,并且线程池中的线程是由线程池自行管理的,开发者无需感知线程的创建、销毁等底层结构。 线程池的使用流程…

    Java 2023年5月30日
    00
  • 深入Java分布式计算的使用分析

    深入Java分布式计算的使用分析 简介 随着大数据和云计算的发展,分布式计算变得越来越重要。Java作为一种广泛使用的编程语言,也具有强大的分布式计算能力。深入学习Java分布式计算,可以帮助解决大规模数据处理和计算问题。 本文将从以下几个方面深入讲解Java分布式计算的使用: 分布式计算概念 Java分布式计算框架概述 使用示例 分布式计算概念 分布式计算…

    Java 2023年5月31日
    00
  • Java多线程之条件对象Condition

    Java多线程中的条件对象Condition是在java.util.concurrent.locks包下的,它和synchronized关键字一样,可以协调线程的执行顺序和通信,不过其功能更为强大,可用于等待条件、通知单个线程和通知所有等待线程。 一、条件对象Condition的基本用法 1. 创建Condition对象 在使用Condition对象前,需要…

    Java 2023年5月19日
    00
  • Angualrjs 表单验证的两种方式(失去焦点验证和点击提交验证)

    AngularJS提供了丰富的表单验证指令,可以轻松实现对用户输入的校验,以保证数据的准确性和完整性。 失去焦点验证 AngularJS通过ng-blur指令可以很方便地实现失去焦点时的表单验证。具体步骤如下: 在HTML表单元素上添加相应的验证指令,如ng-pattern、ng-minlength、ng-maxlength等; 添加一个提示信息的元素或指令…

    Java 2023年6月15日
    00
  • 用Java实现24点游戏

    用Java实现24点游戏攻略 游戏规则 24点游戏是一种比较常见的撕牌游戏,游戏过程如下: 取出4张扑克牌,其中可能包含1-10、J、Q、K四种牌面; 对玩家来说,可以自由任意(+-*/)组合这4张扑克牌,使其结果为24即可; 玩家须进行计算,并在30秒内作出答案,如果时间到了仍没有答案则选手视为失败。 游戏实现思路 为实现24点游戏,我们可以通过Java实…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部