Flink JobGraph生成源码解析

下面是详细讲解“Flink JobGraph生成源码解析”的完整攻略。

什么是Flink JobGraph

Flink JobGraph是Apache Flink的一个重要模块,它描述了一个Flink任务的数据流和操作。在Flink任务启动时,JobGraph会被构建出来,并提交到JobManager进行执行。

JobGraph的生成流程

Flink JobGraph的生成流程可以分为以下几个步骤:

  1. 用户编写Flink程序代码
  2. Flink编译器将用户编写的代码转换成一个DAG(有向无环图),该DAG描述了Flink程序中的数据流和操作。
  3. Flink编译器将DAG转换成一个JobGraph对象,该对象描述了Flink任务的数据流和操作。
  4. JobGraph通过网络传输到JobManager,并由JobManager进行任务调度和执行。

其中,第2步和第3步是比较关键的,下面我们将详细解析这两个步骤的流程。

Flink程序转换成DAG

当用户编写Flink程序时,实际上是在创建数据流。Flink中的数据流可以看做是一系列的逻辑算子(Operator)组成的有向无环图(DAG)。每个算子代表一个数据处理操作,每个算子之间通过边连接,传递数据流。

下面是一个简单的Flink程序示例,该程序将输入的字符串分割成单词,并统计单词出现的次数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("localhost", 9999)
    .flatMap((String line, Collector<String> out) -> {
        for (String word: line.split(" ")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum(1)
    .print();

上述程序中,通过socketTextStream方法创建了一个数据源,flatMap方法对数据进行单词分割,keyBy方法对单词进行分组,sum方法对单词出现的次数进行统计,最终通过print方法输出到控制台。

Flink编译器在编译该程序时,会分析程序中的逻辑算子,生成对应的Operator实例,并将这些Operator连接成一个DAG,如下图所示:

socketTextStream -> flatMap -> keyBy -> sum -> print

其中,箭头表示算子之间的边。

DAG转换成JobGraph

将DAG转换成JobGraph是Flink编译器的一个重要工作,它将DAG抽象成为一个任务的执行计划。

Flink编译器在将DAG转换成JobGraph时,会对每个算子进行预处理,构建出算子的执行环境和执行节点。在每个执行节点上,Flink编译器会将算子转换成任务(Task),执行完整的序列化和反序列化等操作后,将任务加入到JobGraph中。

下面是一个完整的示例程序,它将实现对输入流的单词计数,并输出到指定文件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
    for (String word: value.split(" ")) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

JobClient jobClient = JobManagerClient.create(JobID.generate(), new Configuration(), "localhost");

jobClient.submitJob(jobGraph).get();

在上述程序中,我们使用了FlinkKafkaConsumer010从Kafka中读取数据,使用flatMap对输入数据进行单词计数,使用keyBy对单词进行分组,使用reduce对单词进行统计,最终使用writeAsText将结果输出到指定文件。

在程序的最后,我们通过调用getStreamGraph方法可以获取到整个程序的DAG图,然后使用getJobGraph方法将DAG图转换成JobGraph。最终,我们通过JobManagerClient的submitJob方法提交JobGraph到JobManager进行执行。

两个JobGraph生成的示例

下面是两个不同的Flink任务的JobGraph,它们分别来自于两个不同的Flink程序:

示例一

该示例程序也是一个单词计数程序,它使用socketTextStream读取数据,使用keyBy、sum和print进行数据处理和输出。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.socketTextStream("localhost", 9999)
    .flatMap((String line, Collector<String> out) -> {
        for (String word: line.split(" ")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum(1)
    .print();

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

上述程序的JobGraph如下图所示:

JobGraph
├─JobVertex (source: Socket Stream -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction@3d323106
│       └─Output: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│            └─TypeInformation: org.apache.flink.api.common.typeinfo.BasicTypeInfo@13aa97a9
├─JobVertex (keyBy: STRING) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│       └─Output: T->keyBy: STRING->S: org.apache.flink.api.java.tuple.Tuple2

...

... 

上述JobGraph共包含两个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于socketTextStream方法,输出为单个字符串。第二个JobVertex即为对单词进行分组和统计的部分,它的输入来自于第一个JobVertex,输出类型为Tuple2<String, Integer>

示例二

该示例程序同样是一个单词计数程序,它使用FlinkKafkaConsumer010从Kafka中读取数据,并使用keyBy、reduce和writeAsText将结果输出到指定文件。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));

stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
    for (String word: value.split(" ")) {
        out.collect(new Tuple2<>(word, 1));
    }
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

上述程序的JobGraph如下图所示:

JobGraph
├─JobVertex (source: Kafka -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010@4536ed41
│       └─Output: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (reducer: Reduce) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│       └─Output: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (sink: Text File Sink) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│    ├─Parallelism: 1
│    └─Input: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│       └─Output: org.apache.flink.api.java.tuple.Tuple2
│            └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3

...

上述JobGraph共包含三个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于FlinkKafkaConsumer010对象,输出类型为Tuple2<String, Integer>。第二个JobVertex和第一个JobVertex类似,即为对单词进行统计的部分,而第三个JobVertex为数据输出部分,将结果输出到指定文件。

总结

本文详细讲解了Flink JobGraph生成源码解析的完整攻略,包括JobGraph的生成流程、Flink程序转换成DAG、DAG转换成JobGraph等。最后,我们给出了两个Flink程序的JobGraph示例,帮助读者更好地理解JobGraph的生成过程。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink JobGraph生成源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Sprint Boot @ConditionalOnMissingBean使用方法详解

    @ConditionalOnMissingBean是Spring Boot中的一个注解,它用于根据Spring容器中是否缺少指定的Bean来决定是否启用或禁用某个组件。在使用Spring Boot开发应用程序时,@ConditionalOnMissingBean是非常有用的。本文将详细介绍@ConditionalOnMissingBean的作用和使用方法,并…

    Java 2023年5月5日
    00
  • 解决vue动态下拉菜单 有数据未反应的问题

    针对“解决vue动态下拉菜单 有数据未反应的问题”的问题,下面是完整的解决攻略。 问题描述 在vue动态下拉菜单的实现中,经常会遇到数据未能反应到下拉菜单中的问题,这可能是由于数据未正确绑定或未正确更新导致的。这种情况下,我们需要对代码进行调试和修改,以确保数据正确地反应到下拉菜单中。 解决攻略 下面是解决vue动态下拉菜单数据未反应的完整攻略: 步骤一:数…

    Java 2023年6月15日
    00
  • 图解Java经典算法冒泡选择插入希尔排序的原理与实现

    图解Java经典算法冒泡选择插入希尔排序的原理与实现 什么是排序算法? 排序算法是计算机科学中的一类基本算法,它将一个乱序的数据序列按照一定的规则重新排列,使得排序后的序列满足特定的要求。 常见的排序方法包括冒泡排序、选择排序、插入排序、希尔排序、归并排序、快速排序等。 冒泡排序的原理和实现 冒泡排序是一种简单的排序算法,其基本思想是从小到大依次比较相邻的两…

    Java 2023年5月19日
    00
  • spring+mybatis实现图书管理系统

    以下是“spring+mybatis实现图书管理系统”的完整攻略。 1. 环境准备 首先需要准备好开发环境,包括以下工具和框架: JDK(Java Development Kit): 用于编译和运行Java程序的开发工具包。 Eclipse(或其他Java开发工具):用于编写和调试Java代码的集成开发环境(IDE)。 Maven:Java项目的构建工具,用…

    Java 2023年6月15日
    00
  • 详解使用canvas保存网页为pdf文件支持跨域

    详解使用canvas保存网页为PDF文件支持跨域的完整攻略。 1. 简介 现在越来越多的网站需要支持生成PDF文件。而通过canvas来保存HTML页面为PDF文件是非常流行的一种解决方案,同时它也支持跨域。 2. 实现过程 2.1 引入jsPDF库 我们会使用到一个叫做jsPDF的库来实现将HTML页面转为PDF文件的操作。所以我们首先需要在HTML页面中…

    Java 2023年6月16日
    00
  • 详解SpringMVC 自动封装枚举类的方法

    以下是关于“详解SpringMVC 自动封装枚举类的方法”的完整攻略,其中包含两个示例。 详解SpringMVC 自动封装枚举类的方法 在SpringMVC中,我们可以使用自动封装枚举类的方法来简化代码。在本文中,我们将讲解如何使用自动封装枚举类的方法来简化SpringMVC代码。 自动封装枚举类的方法 在SpringMVC中,我们可以使用自动封装枚举类的方…

    Java 2023年5月17日
    00
  • SSH整合中 hibernate托管给Spring得到SessionFactory

    需要完成整合的总体目的: 将SSH框架中的Hibernate托管给Spring,获取SessionFactory对象并使用SessionFactory对象创建与数据库的会话。 为了达到使用Hibernate的目的,还需要配置 数据源、事务管理器、持久化类映射等。 达到以上目的,步骤如下: 1. 引入依赖 在POM文件中添加 Hibernate、Spring、…

    Java 2023年5月20日
    00
  • jsp+servlet实现最简单的增删改查代码分享

    下面来详细讲解 JSP+Servlet 实现最简单的增删改查代码分享的完整攻略。 1. 准备工作 在开始编写代码之前,需要先准备好以下工具和环境: JDK 1.8 或以上版本 Tomcat 8 或以上版本 Eclipse 或其他 Java IDE 2. 创建项目 在 Eclipse 中创建新的动态 Web 项目,选择 Web Application 项目类型…

    Java 2023年6月15日
    00
合作推广
合作推广
分享本页
返回顶部