Flink部署集群整体架构源码分析

完整攻略:Flink部署集群整体架构源码分析

简介

Apache Flink是一款开源的流式数据处理引擎,能够实现高效、准确、低延迟的数据处理和分析。本文将深入分析Flink的部署集群整体架构源码,为读者提供全面的技术指南。

部署集群整体架构源码分析

架构概述

Flink的整体架构可分为三层:Client、JobManager、TaskManager。其中,Client作为前端,用于提交任务;JobManager作为控制中心,负责任务调度、状态管理等;TaskManager负责实际的数据处理任务。

Client与JobManager通过RPC进行通讯,JobManager与TaskManager之间也是通过RPC进行通讯。JobManager会周期性地心跳TaskManager,以确保TaskManager在线。

在整个架构中,由于TaskManager与Data Sources之间存在网络延迟等问题,引入了一些额外的组件:Data Sinks、Buffer、Network。

代码分析

先来看Flink的整体启动流程:

public static void main(String[] args) throws Exception {
    // ...
    Configuration configuration = ...
    // 根据conf config创建一个CliFrontend对象
    final CliFrontend cli = new CliFrontend(configuration);
    // 调用CliFrontend的run方法启动CLI咨询工具
    int retCode = SecurityUtils.getInstalledContextClassLoader().run(
        new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return cli.run(args);
            }
        }
    );
    // ...
}

CliFrontend类是前端的实现,它将任务提交给JobManager。

CliFrontend启动后,会通过JobClient将任务提交到JobManager:

// 创建JobGraph
JobGraph jobGraph = ...
// 通过JobClient提交JobGraph
JobClient jobClient = JobClient.submitJobAndWaitForResult(clusterClient, jobGraph, false);

JobClient.submitJobAndWaitForResult()方法用于向JobManager提交任务,并等待任务完成。

在JobClient.startJobExecution()方法中,会先向JobManager发送一个JobSubmissionMessage:

private JobSubmissionResult startJobExecution(
    JobGraph jobGraph,
    boolean detached) throws Exception {

    ExecutionConfigAccessor executionConfigAccessor = new ExecutionConfigAccessor(jobGraph.getExecutionConfig());
    JobSubmissionMessage jobSubmissionMessage = new JobSubmissionMessage(
        jobGraph,
        executionConfigAccessor.getCheckpointConfig(),
        executionConfigAccessor.getSavepointRestoreSettings(),
        uploadedJarFileBlobKeys,
        detached);
    final CompletableFuture<Acknowledge> submissionResultFuture = jobManagerGateway.submitJob(jobSubmissionMessage, timeout);
    ...
}

JobManager收到JobSubmissionMessage之后,会在JobMasterGateway.onJobSubmitted()方法中初始化任务:

@Override
public CompletableFuture<Acknowledge> submitJob(final JobSubmissionMessage jobSubmissionMessage, final Time timeout) {
    try {
      if (jobSubmissionMessage.getUserCodeBlobKeys().length > 0) { // User Code
        Future<Iterable<Path>> uploadResultFuture = userJobClassLoader.uploadUserArtifacts(userCodeBlobKeys);
        uploadResultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
      }

      return executeMischa(() -> {
        JobGraph jobGraph = jobSubmissionMessage.getJobGraph();
        jobGraph.setAllowQueuedScheduling(jobSubmissionMessage.getDetachedMode());

        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = new CompletableFuture<>();
        final String jobName = jobGraph.getName();
        final JobID jobId = jobGraph.getJobID();

        classLoaderLeaseRetriever.retrieveClassLoaderLease(jobSubmissionMessage.getUserCodeBlobKeys()).thenAccept(classLoader -> {

          UserCodeClassLoaderFactory.DelegationResource delegationResource = new UserCodeClassLoaderFactory.DelegationResource(classLoader, jobGraph.getClasspaths(), jobGraph.getGlobalClasspaths(), jobGraph.getClassLoaderResolveOrder(), jobGraph.getUserClassLoaderParentFirstPatterns());

          classLoaderUserCodes.put(jobId, delegationResource);

          if (slotPoolContainer == null) {
            slotPoolContainer = slotPoolService.createSlotPool(jobId);
            slotPoolOwner = slotPoolContainer.getSlotPoolOwner();
          }

          final JobMaster jobMaster = jobManagerJobMetricGroup.<JobMaster>addGroup("JobMaster")
              .closeable(() -> {
                jobManagerMetricGroup.close();
                classLoaderLeaseRetriever.closeAsync();
                classLoaderUserCodes.remove(jobId);
                ExceptionUtils.suppressExceptions(() -> slotPoolContainer.close());
              })
              .getIOMetricGroup()
              .map(ioMetricGroup -> jobManagerServices.createJobMaster(
                  jobGraph,
                  ioExecutor,
                  slotPoolContainer,
                  leaderElectionService,
                  checkpointer,
                  classLoader,
                  delegationResource.getClasspaths(),
                  delegationResource.getGlobalClasspaths(),
                  delegationResource.getClassLoaderResolveOrder(),
                  delegationResource.getUserClassLoaderParentFirstPatterns(),
                  blobServer,
                  eventPublisher,
                  rpcTimeout,
                  ioExecutor.getCheckpointRecoveryFactory(),
                  schedulerOperatorFactory,
                  ioMetricGroup,
                  isTimeTrackingMetricsEnabled(),
                  gatewayRetriever,
                  partitionTrackerFactory,
                  partitionCoordinator,
                  accumulatorProvider,
                  expireDeploymentsListeners));

          CompletableFuture<Void> jobManagerTerminationFuture = new CompletableFuture<>();

          ExceptionUtils.runJobAsync(() -> jobManager.runJobMaster(jobMaster, jobManagerTerminationFuture), "JobManager (master)", LOG)
              .whenCompleteAsync(
                  (Void ignored, Throwable throwable) -> {
                    if (throwable != null) {
                      jobManagerTerminationFuture.completeExceptionally(throwable);
                    } else {
                      jobManagerTerminationFuture.complete(null);
                    }
                  },
                  getMainThreadExecutor());

          jobManagerTerminationFuture.whenComplete(
              (Object ignored, Throwable throwable) -> {

                if (throwable instanceof FlinkException) {
                  ExceptionUtils.rethrowException(throwable, "JobManager failed to properly terminate.");
                } else if (throwable != null) {
                  LOG.error("JobManager failed with "+throwable.getMessage(), throwable);
                  try {
                    getSelfGateway(JobMasterGateway.class).closeAsync();
                  } catch (Throwable t) {
                    LOG.warn("Error closing JobManager after initialization failure.", t);
                  }
                  acknowledgeCompletableFuture.completeExceptionally(throwable);
                } else {
                  acknowledgeCompletableFuture.complete(Acknowledge.get());
                }
              });
        });

        return acknowledgeCompletableFuture;
      });
    } catch (Throwable t) {
      ExceptionUtils.rethrow(t);
      return null; // silence the compiler
    }
  }

在JobMaster中,JobGraph会被拆分成多个ExecutionGraph,每个ExecutionGraph都会对应一个TaskManager。ExecutionGraph被拆分成多个Subtask,每个Subtask对应一个Task运行在TaskManager中。

若TaskManager在运行过程中出现了异常,会向对应的JobManager发送一个FailTaskExecutionAttempt消息,以便重新执行该任务。

示例1

以下是一个简单的Flink程序,用于统计输入数据中每个单词出现的次数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("localhost", 9000);
DataStream<Tuple2<String, Integer>> counts = lines
    .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    })
    .keyBy(0)
    .sum(1);
counts.print();
env.execute("Word Count");

该程序会从TCP流中读取文本数据,分词后统计每个单词出现的次数。

示例2

下面是一个更加复杂的Flink程序,用于在线实时推荐:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserAction> rawUserActions = env.addSource(new UserActionSource());

DataStream<UserEvent> userEvents = rawUserActions
    .filter(new FilterFunction<UserAction>() {
        @Override
        public boolean filter(UserAction action) {
            return action.getType() == UserActionType.PAGE_VIEW;
        }
    })
    .map(new MapFunction<UserAction, UserEvent>() {
        @Override
        public UserEvent map(UserAction action) {
            return new UserEvent(action.getUserId(), action.getItemId(), action.getTimestamp())
        }
    });

DataStream<ItemEvent> itemEvents = rawUserActions
    .filter(new FilterFunction<UserAction>() {
        @Override
        public boolean filter(UserAction action) {
            return action.getType() == UserActionType.ITEM_UPDATE;
        }
    })
    .map(new MapFunction<UserAction, ItemEvent>() {
        @Override
        public ItemEvent map(UserAction action) {
            return new ItemEvent(action.getItemId(), action.getTimestamp());
        }
    });

DataStream<Tuple2<Integer, String>> recommendations = userEvents
    .keyBy(new KeySelector<UserEvent, Integer>() {
        @Override
        public Integer getKey(UserEvent event) {
            return event.getUserId();
        }
    })
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
    .apply(new RecommendationFunction(itemEvents));

recommendations.print();
env.execute("Online Real-time Recommendation");

该程序会从UserActionSource输入用户行为,包括PAGE_VIEW和ITEM_UPDATE。每当有新的PAGE_VIEW事件时,程序会尝试根据过去5分钟内的用户行为,推荐适合该用户的ITEM_UPDATE事件。最终,对于每个用户,程序会输出最新一条推荐的ITEM_UPDATE事件。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink部署集群整体架构源码分析 - Python技术站

(0)
上一篇 2023年6月27日
下一篇 2023年6月27日

相关文章

  • 详解Java Socket通信封装MIna框架

    详解Java Socket通信封装Mina框架 1. Java Socket通信介绍 Java Socket通信是一种网络通信方式,它是TCP/IP协议的一种实现。在Java中,Socket通信通常被用于构建客户端和服务器端应用程序。Java Socket通信可以使用Java中的Socket类和ServerSocket类来实现。 在Java Socket通信…

    other 2023年6月25日
    00
  • 圣西罗足球场-景点介绍

    以下是关于圣西罗足球场景点介绍的完整攻略,包括基本概念、历史背景、景点介绍和两个示例说明。 圣西罗足球场景点介绍的基本概念 圣西罗足球场是位于意大利米兰的一座足球场,是AC米兰和国际米兰两支足球俱乐部的主场。圣西罗足球场是世界上最著名的足球场之一,也是欧洲最大的足球场之一。 圣西罗足球场景点介绍的历史背景 圣西罗足球场建于1926年,最初是为了举办1928年…

    other 2023年5月7日
    00
  • CS1.6怎么架设服务器 cs1.6服务器架设及终极优化教程

    CS1.6服务器架设及终极优化教程 作为一款经典的第一人称射击游戏,CS1.6自然也有很多玩家想要自己架设服务器。本文将提供一份详细的攻略,帮助玩家搭建自己的CS1.6服务器,并终极优化游戏体验。 硬件要求 为了保证服务器运行顺畅,需要满足一定的硬件要求。推荐硬件配置如下: CPU:Intel Core i5或AMD Ryzen 5以上 内存:8GB或以上 …

    other 2023年6月27日
    00
  • table单元格边框合并

    以下是table单元格边框合并的完整攻略,包括以下内容: 概述 合并单元格边框的方法 示例说明 1. 概述 在HTML中,可以使用table标签创建表格。有时候,需要将表格中的单元格边框合并,以实现更美观的表格效果。本文将介绍如何合并单元格边框。 2. 合并单元格边框的方法 合并单元格边框的方法如下: 使用CSS的border-collapse属性 <…

    other 2023年5月9日
    00
  • 【hyperscan】编译hyperscan 4.0.0

    下面是“【hyperscan】编译hyperscan 4.0.0的完整攻略”,包括安装依赖、下载源码、编译和两个示例说明。 安装依赖 在编译 hyperscan 4.0.0 之前,需要安装以下依赖: CMake 3.4 或更高版本 GCC 4.8 或更高版本 Boost 1.58 或更高版本 可以使用以下命令在 Ubuntu 16.04 中安装这些依赖: s…

    other 2023年5月5日
    00
  • npm卸载及安装流程

    npm卸载及安装流程 npm是Node.js的包管理器,可以方便地安装、卸载和管理Node.js模块。本攻略将介绍的卸载及安装流程,并提供两个示例如下。 卸载npm 如果需要卸载npm,可以使用以下命令: npm uninstall npm -g 这个命令会卸载全局安装的npm包。如果需要卸载本地安装的npm包,可以在项目目录下执行以下命令: npm uni…

    other 2023年5月7日
    00
  • 关于jdk环境变量配置以及javac不是内部或外部命令的解决

    关于JDK环境变量配置以及javac不是内部或外部命令的解决有以下攻略: 配置JAVA_HOME环境变量 前往Oracle官网下载对应版本的JDK,例如jdk-11.0.4_windows-x64_bin.exe 打开安装文件,按照提示进行安装,注意安装路径,以jdk-11.0.4为例,默认路径为C:\Program Files\Java\jdk-11.0.…

    other 2023年6月27日
    00
  • 微信小程序自定义导航教程(兼容各种手机)

    我将详细讲解“微信小程序自定义导航教程(兼容各种手机)”的完整攻略。 一、背景介绍 在微信小程序中,我们经常需要使用自定义导航栏来实现更加个性化的界面效果。然而,不同型号的手机在导航栏高度、胶囊按钮大小和位置等方面都存在差异,因此需要我们设计合理的方案来兼容各种手机。 二、方案设计 1. 设置全局样式: 我们可以在app.wxss文件中设置全局样式,包括导航…

    other 2023年6月25日
    00
合作推广
合作推广
分享本页
返回顶部