Flink JobGraph生成源码解析

下面是详细讲解“Flink JobGraph生成源码解析”的完整攻略。

什么是Flink JobGraph

Flink JobGraph是Apache Flink的一个重要模块,它描述了一个Flink任务的数据流和操作。在Flink任务启动时,JobGraph会被构建出来,并提交到JobManager进行执行。

JobGraph的生成流程

Flink JobGraph的生成流程可以分为以下几个步骤:

  1. 用户编写Flink程序代码
  2. Flink编译器将用户编写的代码转换成一个DAG(有向无环图),该DAG描述了Flink程序中的数据流和操作。
  3. Flink编译器将DAG转换成一个JobGraph对象,该对象描述了Flink任务的数据流和操作。
  4. JobGraph通过网络传输到JobManager,并由JobManager进行任务调度和执行。

其中,第2步和第3步是比较关键的,下面我们将详细解析这两个步骤的流程。

Flink程序转换成DAG

当用户编写Flink程序时,实际上是在创建数据流。Flink中的数据流可以看做是一系列的逻辑算子(Operator)组成的有向无环图(DAG)。每个算子代表一个数据处理操作,每个算子之间通过边连接,传递数据流。

下面是一个简单的Flink程序示例,该程序将输入的字符串分割成单词,并统计单词出现的次数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("localhost", 9999)
    .flatMap((String line, Collector<String> out) -> {
        for (String word: line.split(" ")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum(1)
    .print();

上述程序中,通过socketTextStream方法创建了一个数据源,flatMap方法对数据进行单词分割,keyBy方法对单词进行分组,sum方法对单词出现的次数进行统计,最终通过print方法输出到控制台。

Flink编译器在编译该程序时,会分析程序中的逻辑算子,生成对应的Operator实例,并将这些Operator连接成一个DAG,如下图所示:

socketTextStream -> flatMap -> keyBy -> sum -> print

其中,箭头表示算子之间的边。

DAG转换成JobGraph

将DAG转换成JobGraph是Flink编译器的一个重要工作,它将DAG抽象成为一个任务的执行计划。

Flink编译器在将DAG转换成JobGraph时,会对每个算子进行预处理,构建出算子的执行环境和执行节点。在每个执行节点上,Flink编译器会将算子转换成任务(Task),执行完整的序列化和反序列化等操作后,将任务加入到JobGraph中。

下面是一个完整的示例程序,它将实现对输入流的单词计数,并输出到指定文件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
    for (String word: value.split(" ")) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

JobClient jobClient = JobManagerClient.create(JobID.generate(), new Configuration(), "localhost");

jobClient.submitJob(jobGraph).get();

在上述程序中,我们使用了FlinkKafkaConsumer010从Kafka中读取数据,使用flatMap对输入数据进行单词计数,使用keyBy对单词进行分组,使用reduce对单词进行统计,最终使用writeAsText将结果输出到指定文件。

在程序的最后,我们通过调用getStreamGraph方法可以获取到整个程序的DAG图,然后使用getJobGraph方法将DAG图转换成JobGraph。最终,我们通过JobManagerClient的submitJob方法提交JobGraph到JobManager进行执行。

两个JobGraph生成的示例

下面是两个不同的Flink任务的JobGraph,它们分别来自于两个不同的Flink程序:

示例一

该示例程序也是一个单词计数程序,它使用socketTextStream读取数据,使用keyBy、sum和print进行数据处理和输出。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("localhost", 9999)
    .flatMap((String line, Collector<String> out) -> {
        for (String word: line.split(" ")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum(1)
    .print();

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

上述程序的JobGraph如下图所示:

JobGraph
├─JobVertex (source: Socket Stream -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction@3d323106
│       └─Output: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│            └─TypeInformation: org.apache.flink.api.common.typeinfo.BasicTypeInfo@13aa97a9
├─JobVertex (keyBy: STRING) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│       └─Output: T->keyBy: STRING->S: org.apache.flink.api.java.tuple.Tuple2

...

... 

上述JobGraph共包含两个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于socketTextStream方法,输出为单个字符串。第二个JobVertex即为对单词进行分组和统计的部分,它的输入来自于第一个JobVertex,输出类型为Tuple2<String, Integer>

示例二

该示例程序同样是一个单词计数程序,它使用FlinkKafkaConsumer010从Kafka中读取数据,并使用keyBy、reduce和writeAsText将结果输出到指定文件。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
    for (String word: value.split(" ")) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

上述程序的JobGraph如下图所示:

JobGraph
├─JobVertex (source: Kafka -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010@4536ed41
│       └─Output: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (reducer: Reduce) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│       └─Output: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (sink: Text File Sink) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│       └─Output: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3

...

上述JobGraph共包含三个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于FlinkKafkaConsumer010对象,输出类型为Tuple2<String, Integer>。第二个JobVertex和第一个JobVertex类似,即为对单词进行统计的部分,而第三个JobVertex为数据输出部分,将结果输出到指定文件。

总结

本文详细讲解了Flink JobGraph生成源码解析的完整攻略,包括JobGraph的生成流程、Flink程序转换成DAG、DAG转换成JobGraph等。最后,我们给出了两个Flink程序的JobGraph示例,帮助读者更好地理解JobGraph的生成过程。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink JobGraph生成源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Maven打包跳过测试的三种方法

    这里提供三种 Maven 打包跳过测试的方法: 方法一:使用 “-DskipTests” 参数 这是打包时常见使用的参数, 可以跳过测试并进行打包。在命令行中使用参数如下: mvn clean package -DskipTests 或者在 pom.xml 文件中的 build 标签中添加如下配置: <build> <plugins>…

    Java 2023年5月19日
    00
  • Ajax添加数据与删除篇实现代码

    下面详细讲解“Ajax添加数据与删除篇实现代码”的完整攻略。 一、准备工作 在正式开始编写Ajax添加数据与删除篇的实现代码前,需要先完成以下准备工作: 确保你已经学习过Ajax基础知识,包括Ajax的基本流程、请求方式、回调函数等等。 确定添加数据与删除篇功能需要操作的数据表格,包括表格名称、字段名称等等。 熟悉服务器端处理Ajax请求的技术,例如PHP、…

    Java 2023年6月15日
    00
  • java的Hibernate框架报错“NonUniqueObjectException”的原因和解决方法

    当使用Hibernate框架时,可能会遇到“NonUniqueObjectException”错误。这个错误通常是由于以下原因之一引起的: 多个实体对象具有相同的标识符:如果您的多个实体对象具有相同的标识符,则可能会出现此错误。在这种情况下,需要检查您的实体对象并确保它们具有唯一的标识符。 会话中存在多个实体对象:如果您的会话中存在多个实体对象,则可能会出现…

    Java 2023年5月4日
    00
  • maven install报错中程序包xxx不存在的问题解决

    这里是“maven install报错中程序包xxx不存在的问题解决”的完整攻略。 问题描述 在使用Maven构建项目时,有时候会遇到类似如下错误信息: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile (default-co…

    Java 2023年5月19日
    00
  • java多线程实现服务器端与多客户端之间的通信

    以下是“Java多线程实现服务器端与多客户端之间的通信”的完整攻略: 1. 确定通信协议 在服务器端与多客户端之间进行通信的前提是要确定一个基于网络的通信协议。一般情况下,TCP协议是实现这样的通信的最好选择。TCP协议通过三次握手建立连接,确保数据完整性,是一种可靠的协议。所以,我们需要在项目中导入java.net包,来使用TCP协议的功能。 2. 编写服…

    Java 2023年5月19日
    00
  • Java中数组的定义和使用教程(二)

    当我编写了有关Java中数组的定义和使用教程(二)的文章时,我旨在帮助初学者更好地理解Java中数组的使用,下面详细介绍一下这篇教程: 一、定义数组 定义数组的一般格式如下: dataType[] arrayRefVar = new dataType[arraySize]; 其中: dataType:可以是任何的Java类型,例如:int、double、by…

    Java 2023年5月26日
    00
  • 简单了解java数组传递方法

    下面是关于“简单了解Java数组传递方法”的完整攻略。 一、Java数组简介 数组是Java编程语言中的一种引用类型,它是一种容器,用于存储固定数量的相同类型的数据。数组可以存储基本数据类型(如int,float,double)、对象(如String)和其他数组类型。 Java数组的声明如下: type[] arrayName; 其中,type可以是任何数据…

    Java 2023年5月26日
    00
  • Java中使用JCOM操作Office对象

    以下是Java中使用JCOM操作Office对象的完整攻略: 1. JCOM简介 JCOM是Java对COM(Component Object Model)对象进行访问的类库,简化了访问COM对象的方式,使得Java程序可以轻松地调用诸如Office、Excel等COM组件。使用JCOM时需要预先安装JavaComBridge并注册注册JavaComBrid…

    Java 2023年6月16日
    00
合作推广
合作推广
分享本页
返回顶部