下面是详细讲解“Flink JobGraph生成源码解析”的完整攻略。
什么是Flink JobGraph
Flink JobGraph是Apache Flink的一个重要模块,它描述了一个Flink任务的数据流和操作。在Flink任务启动时,JobGraph会被构建出来,并提交到JobManager进行执行。
JobGraph的生成流程
Flink JobGraph的生成流程可以分为以下几个步骤:
- 用户编写Flink程序代码
- Flink编译器将用户编写的代码转换成一个DAG(有向无环图),该DAG描述了Flink程序中的数据流和操作。
- Flink编译器将DAG转换成一个JobGraph对象,该对象描述了Flink任务的数据流和操作。
- JobGraph通过网络传输到JobManager,并由JobManager进行任务调度和执行。
其中,第2步和第3步是比较关键的,下面我们将详细解析这两个步骤的流程。
Flink程序转换成DAG
当用户编写Flink程序时,实际上是在创建数据流。Flink中的数据流可以看做是一系列的逻辑算子(Operator)组成的有向无环图(DAG)。每个算子代表一个数据处理操作,每个算子之间通过边连接,传递数据流。
下面是一个简单的Flink程序示例,该程序将输入的字符串分割成单词,并统计单词出现的次数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9999)
.flatMap((String line, Collector<String> out) -> {
for (String word: line.split(" ")) {
out.collect(word);
}
})
.keyBy(word -> word)
.sum(1)
.print();
上述程序中,通过socketTextStream方法创建了一个数据源,flatMap方法对数据进行单词分割,keyBy方法对单词进行分组,sum方法对单词出现的次数进行统计,最终通过print方法输出到控制台。
Flink编译器在编译该程序时,会分析程序中的逻辑算子,生成对应的Operator实例,并将这些Operator连接成一个DAG,如下图所示:
socketTextStream -> flatMap -> keyBy -> sum -> print
其中,箭头表示算子之间的边。
DAG转换成JobGraph
将DAG转换成JobGraph是Flink编译器的一个重要工作,它将DAG抽象成为一个任务的执行计划。
Flink编译器在将DAG转换成JobGraph时,会对每个算子进行预处理,构建出算子的执行环境和执行节点。在每个执行节点上,Flink编译器会将算子转换成任务(Task),执行完整的序列化和反序列化等操作后,将任务加入到JobGraph中。
下面是一个完整的示例程序,它将实现对输入流的单词计数,并输出到指定文件:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));
stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
for (String word: value.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
JobClient jobClient = JobManagerClient.create(JobID.generate(), new Configuration(), "localhost");
jobClient.submitJob(jobGraph).get();
在上述程序中,我们使用了FlinkKafkaConsumer010从Kafka中读取数据,使用flatMap对输入数据进行单词计数,使用keyBy对单词进行分组,使用reduce对单词进行统计,最终使用writeAsText将结果输出到指定文件。
在程序的最后,我们通过调用getStreamGraph方法可以获取到整个程序的DAG图,然后使用getJobGraph方法将DAG图转换成JobGraph。最终,我们通过JobManagerClient的submitJob方法提交JobGraph到JobManager进行执行。
两个JobGraph生成的示例
下面是两个不同的Flink任务的JobGraph,它们分别来自于两个不同的Flink程序:
示例一
该示例程序也是一个单词计数程序,它使用socketTextStream读取数据,使用keyBy、sum和print进行数据处理和输出。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9999)
.flatMap((String line, Collector<String> out) -> {
for (String word: line.split(" ")) {
out.collect(word);
}
})
.keyBy(word -> word)
.sum(1)
.print();
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
上述程序的JobGraph如下图所示:
JobGraph
├─JobVertex (source: Socket Stream -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│ ├─Parallelism: 1
│ └─Input: org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction@3d323106
│ └─Output: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│ └─TypeInformation: org.apache.flink.api.common.typeinfo.BasicTypeInfo@13aa97a9
├─JobVertex (keyBy: STRING) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│ ├─Parallelism: 1
│ └─Input: T->[Socket Stream -> Flat Map -> Map]->S: java.lang.String
│ └─Output: T->keyBy: STRING->S: org.apache.flink.api.java.tuple.Tuple2
...
...
上述JobGraph共包含两个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于socketTextStream方法,输出为单个字符串。第二个JobVertex即为对单词进行分组和统计的部分,它的输入来自于第一个JobVertex,输出类型为Tuple2<String, Integer>
。
示例二
该示例程序同样是一个单词计数程序,它使用FlinkKafkaConsumer010从Kafka中读取数据,并使用keyBy、reduce和writeAsText将结果输出到指定文件。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties));
stream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
for (String word: value.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.reduce((Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1))
.writeAsText("./output/word-count.txt");
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
上述程序的JobGraph如下图所示:
JobGraph
├─JobVertex (source: Kafka -> Flat Map -> Map) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│ ├─Parallelism: 1
│ └─Input: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010@4536ed41
│ └─Output: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│ └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (reducer: Reduce) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│ ├─Parallelism: 1
│ └─Input: T->[Kafka -> Flat Map -> Map]->S: org.apache.flink.api.java.tuple.Tuple2
│ └─Output: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│ └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
├─JobVertex (sink: Text File Sink) : (Parallelism: 1, Max Parallelism: DEFAULT_MAX_PARALLELISM)
│ ├─Parallelism: 1
│ └─Input: T->reducer: Reduce->S: org.apache.flink.api.java.tuple.Tuple2
│ └─Output: org.apache.flink.api.java.tuple.Tuple2
│ └─TypeInformation: org.apache.flink.api.java.typeutils.TupleTypeInfo@5d22bbb3
...
上述JobGraph共包含三个JobVertex,第一个JobVertex即为数据源读取和处理的部分,它的输入来自于FlinkKafkaConsumer010对象,输出类型为Tuple2<String, Integer>
。第二个JobVertex和第一个JobVertex类似,即为对单词进行统计的部分,而第三个JobVertex为数据输出部分,将结果输出到指定文件。
总结
本文详细讲解了Flink JobGraph生成源码解析的完整攻略,包括JobGraph的生成流程、Flink程序转换成DAG、DAG转换成JobGraph等。最后,我们给出了两个Flink程序的JobGraph示例,帮助读者更好地理解JobGraph的生成过程。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink JobGraph生成源码解析 - Python技术站