Flink部署集群整体架构源码分析

完整攻略:Flink部署集群整体架构源码分析

简介

Apache Flink是一款开源的流式数据处理引擎,能够实现高效、准确、低延迟的数据处理和分析。本文将深入分析Flink的部署集群整体架构源码,为读者提供全面的技术指南。

部署集群整体架构源码分析

架构概述

Flink的整体架构可分为三层:Client、JobManager、TaskManager。其中,Client作为前端,用于提交任务;JobManager作为控制中心,负责任务调度、状态管理等;TaskManager负责实际的数据处理任务。

Client与JobManager通过RPC进行通讯,JobManager与TaskManager之间也是通过RPC进行通讯。JobManager会周期性地心跳TaskManager,以确保TaskManager在线。

在整个架构中,由于TaskManager与Data Sources之间存在网络延迟等问题,引入了一些额外的组件:Data Sinks、Buffer、Network。

代码分析

先来看Flink的整体启动流程:

public static void main(String[] args) throws Exception {
    // ...
    Configuration configuration = ...
    // 根据conf config创建一个CliFrontend对象
    final CliFrontend cli = new CliFrontend(configuration);
    // 调用CliFrontend的run方法启动CLI咨询工具
    int retCode = SecurityUtils.getInstalledContextClassLoader().run(
        new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return cli.run(args);
            }
        }
    );
    // ...
}

CliFrontend类是前端的实现,它将任务提交给JobManager。

CliFrontend启动后,会通过JobClient将任务提交到JobManager:

// 创建JobGraph
JobGraph jobGraph = ...
// 通过JobClient提交JobGraph
JobClient jobClient = JobClient.submitJobAndWaitForResult(clusterClient, jobGraph, false);

JobClient.submitJobAndWaitForResult()方法用于向JobManager提交任务,并等待任务完成。

在JobClient.startJobExecution()方法中,会先向JobManager发送一个JobSubmissionMessage:

private JobSubmissionResult startJobExecution(
    JobGraph jobGraph,
    boolean detached) throws Exception {

    ExecutionConfigAccessor executionConfigAccessor = new ExecutionConfigAccessor(jobGraph.getExecutionConfig());
    JobSubmissionMessage jobSubmissionMessage = new JobSubmissionMessage(
        jobGraph,
        executionConfigAccessor.getCheckpointConfig(),
        executionConfigAccessor.getSavepointRestoreSettings(),
        uploadedJarFileBlobKeys,
        detached);
    final CompletableFuture<Acknowledge> submissionResultFuture = jobManagerGateway.submitJob(jobSubmissionMessage, timeout);
    ...
}

JobManager收到JobSubmissionMessage之后,会在JobMasterGateway.onJobSubmitted()方法中初始化任务:

@Override
public CompletableFuture<Acknowledge> submitJob(final JobSubmissionMessage jobSubmissionMessage, final Time timeout) {
    try {
      if (jobSubmissionMessage.getUserCodeBlobKeys().length > 0) { // User Code
        Future<Iterable<Path>> uploadResultFuture = userJobClassLoader.uploadUserArtifacts(userCodeBlobKeys);
        uploadResultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
      }

      return executeMischa(() -> {
        JobGraph jobGraph = jobSubmissionMessage.getJobGraph();
        jobGraph.setAllowQueuedScheduling(jobSubmissionMessage.getDetachedMode());

        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = new CompletableFuture<>();
        final String jobName = jobGraph.getName();
        final JobID jobId = jobGraph.getJobID();

        classLoaderLeaseRetriever.retrieveClassLoaderLease(jobSubmissionMessage.getUserCodeBlobKeys()).thenAccept(classLoader -> {

          UserCodeClassLoaderFactory.DelegationResource delegationResource = new UserCodeClassLoaderFactory.DelegationResource(classLoader, jobGraph.getClasspaths(), jobGraph.getGlobalClasspaths(), jobGraph.getClassLoaderResolveOrder(), jobGraph.getUserClassLoaderParentFirstPatterns());

          classLoaderUserCodes.put(jobId, delegationResource);

          if (slotPoolContainer == null) {
            slotPoolContainer = slotPoolService.createSlotPool(jobId);
            slotPoolOwner = slotPoolContainer.getSlotPoolOwner();
          }

          final JobMaster jobMaster = jobManagerJobMetricGroup.<JobMaster>addGroup("JobMaster")
              .closeable(() -> {
                jobManagerMetricGroup.close();
                classLoaderLeaseRetriever.closeAsync();
                classLoaderUserCodes.remove(jobId);
                ExceptionUtils.suppressExceptions(() -> slotPoolContainer.close());
              })
              .getIOMetricGroup()
              .map(ioMetricGroup -> jobManagerServices.createJobMaster(
                  jobGraph,
                  ioExecutor,
                  slotPoolContainer,
                  leaderElectionService,
                  checkpointer,
                  classLoader,
                  delegationResource.getClasspaths(),
                  delegationResource.getGlobalClasspaths(),
                  delegationResource.getClassLoaderResolveOrder(),
                  delegationResource.getUserClassLoaderParentFirstPatterns(),
                  blobServer,
                  eventPublisher,
                  rpcTimeout,
                  ioExecutor.getCheckpointRecoveryFactory(),
                  schedulerOperatorFactory,
                  ioMetricGroup,
                  isTimeTrackingMetricsEnabled(),
                  gatewayRetriever,
                  partitionTrackerFactory,
                  partitionCoordinator,
                  accumulatorProvider,
                  expireDeploymentsListeners));

          CompletableFuture<Void> jobManagerTerminationFuture = new CompletableFuture<>();

          ExceptionUtils.runJobAsync(() -> jobManager.runJobMaster(jobMaster, jobManagerTerminationFuture), "JobManager (master)", LOG)
              .whenCompleteAsync(
                  (Void ignored, Throwable throwable) -> {
                    if (throwable != null) {
                      jobManagerTerminationFuture.completeExceptionally(throwable);
                    } else {
                      jobManagerTerminationFuture.complete(null);
                    }
                  },
                  getMainThreadExecutor());

          jobManagerTerminationFuture.whenComplete(
              (Object ignored, Throwable throwable) -> {

                if (throwable instanceof FlinkException) {
                  ExceptionUtils.rethrowException(throwable, "JobManager failed to properly terminate.");
                } else if (throwable != null) {
                  LOG.error("JobManager failed with "+throwable.getMessage(), throwable);
                  try {
                    getSelfGateway(JobMasterGateway.class).closeAsync();
                  } catch (Throwable t) {
                    LOG.warn("Error closing JobManager after initialization failure.", t);
                  }
                  acknowledgeCompletableFuture.completeExceptionally(throwable);
                } else {
                  acknowledgeCompletableFuture.complete(Acknowledge.get());
                }
              });
        });

        return acknowledgeCompletableFuture;
      });
    } catch (Throwable t) {
      ExceptionUtils.rethrow(t);
      return null; // silence the compiler
    }
  }

在JobMaster中,JobGraph会被拆分成多个ExecutionGraph,每个ExecutionGraph都会对应一个TaskManager。ExecutionGraph被拆分成多个Subtask,每个Subtask对应一个Task运行在TaskManager中。

若TaskManager在运行过程中出现了异常,会向对应的JobManager发送一个FailTaskExecutionAttempt消息,以便重新执行该任务。

示例1

以下是一个简单的Flink程序,用于统计输入数据中每个单词出现的次数:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("localhost", 9000);
DataStream<Tuple2<String, Integer>> counts = lines
    .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    })
    .keyBy(0)
    .sum(1);
counts.print();
env.execute("Word Count");

该程序会从TCP流中读取文本数据,分词后统计每个单词出现的次数。

示例2

下面是一个更加复杂的Flink程序,用于在线实时推荐:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserAction> rawUserActions = env.addSource(new UserActionSource());

DataStream<UserEvent> userEvents = rawUserActions
    .filter(new FilterFunction<UserAction>() {
        @Override
        public boolean filter(UserAction action) {
            return action.getType() == UserActionType.PAGE_VIEW;
        }
    })
    .map(new MapFunction<UserAction, UserEvent>() {
        @Override
        public UserEvent map(UserAction action) {
            return new UserEvent(action.getUserId(), action.getItemId(), action.getTimestamp())
        }
    });

DataStream<ItemEvent> itemEvents = rawUserActions
    .filter(new FilterFunction<UserAction>() {
        @Override
        public boolean filter(UserAction action) {
            return action.getType() == UserActionType.ITEM_UPDATE;
        }
    })
    .map(new MapFunction<UserAction, ItemEvent>() {
        @Override
        public ItemEvent map(UserAction action) {
            return new ItemEvent(action.getItemId(), action.getTimestamp());
        }
    });

DataStream<Tuple2<Integer, String>> recommendations = userEvents
    .keyBy(new KeySelector<UserEvent, Integer>() {
        @Override
        public Integer getKey(UserEvent event) {
            return event.getUserId();
        }
    })
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
    .apply(new RecommendationFunction(itemEvents));

recommendations.print();
env.execute("Online Real-time Recommendation");

该程序会从UserActionSource输入用户行为,包括PAGE_VIEW和ITEM_UPDATE。每当有新的PAGE_VIEW事件时,程序会尝试根据过去5分钟内的用户行为,推荐适合该用户的ITEM_UPDATE事件。最终,对于每个用户,程序会输出最新一条推荐的ITEM_UPDATE事件。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink部署集群整体架构源码分析 - Python技术站

(0)
上一篇 2023年6月27日
下一篇 2023年6月27日

相关文章

  • zookeeper常用端口

    ZooKeeper常用端口攻略 ZooKeeper是一个分布式协调服务,它使用一组端口来提供服务。本文将介绍ZooKeeper常用端口及其用途,并提供两个示例说明。 ZooKeeper常用端口 以下是ZooKeeper常用端口及其用途: 2181:客户端端口,用于连接ZooKeeper集群。 2888:集群内部通信端口,用于选举Leader。 3888:集群…

    other 2023年5月6日
    00
  • Android实现扫描二维码功能

    Android实现扫描二维码功能攻略 本攻略将详细介绍如何在Android应用中实现扫描二维码的功能。我们将使用ZXing库来实现扫描功能,并提供两个示例说明。 步骤一:导入ZXing库 首先,我们需要在Android项目中导入ZXing库。可以通过以下步骤完成导入: 在项目的build.gradle文件中,添加以下依赖项: implementation ‘…

    other 2023年9月6日
    00
  • linux操作系统详解

    Linux 操作系统详解 Linux 操作系统是一种免费、开源的操作系统,被广泛应用于服务器、嵌入式设备、个人电脑等各种场景。本文将介绍 Linux 操作系统的基本概念、命令行操作、文件系统、软件包管理等内容,并提供两个示例说明。 基本概念 Linux 操作系统是一个多用户、多任务、分时操作系统。 Linux 内核是操作系统的核心,提供了硬件与软件之间的接口…

    其他 2023年4月16日
    00
  • Android实现可滑动的自定义日历控件

    Android实现可滑动的自定义日历控件攻略 1. 概述 在Android中实现可滑动的自定义日历控件可以提供用户友好的日历浏览体验。本攻略将介绍一种实现方法,使用RecyclerView和自定义Adapter来展示日历,并通过手势监听实现滑动功能。 2. 步骤 2.1 创建项目和布局文件 首先,创建一个新的Android项目,并在布局文件中添加一个Recy…

    other 2023年9月6日
    00
  • 南湖书院-景点介绍

    南湖书院-景点介绍攻略 南湖书院是中国南京市鼓楼区的一处历史文化景点,建于明朝洪武年间,是明朝著名学者王守仁的故居。以下是南湖书院的完整攻略: 步骤一:了解南湖书院的历史和文化 南湖书院是明朝著名学者王守仁的故居,也是明朝时期南京城内最大的私塾之一。南湖书院以其深厚的文化底蕴和优美的园林环境而闻名于世在南湖书院,客可以了解到明朝时期的文化和教育,以及王守仁的…

    other 2023年5月9日
    00
  • Inlay技术要求

    下面是 Inlay 技术要求的完整攻略,包括基本原理、实现方法和两个示例说明。 基本原理 Inlay 技术是一种将芯片嵌入 PCB 板中的技术,可以将芯片和 PCB 板集成在一起,从而实现更小、更轻、更高效的电子产品。Inlay 技术的基本原理是将芯片嵌入 PCB 板中,然后通过封装和连接技术将芯片与 PCB 板连接起来。 实现方法 实现 Inlay 技术的…

    other 2023年5月5日
    00
  • Win11重启一直转圈圈进不去系统怎么办?Win11重启转圈圈两种解决方法

    针对Win11重启一直转圈圈进不去系统这个问题,一般情况下可以采取以下两种解决方法: 方法一:检查系统文件和驱动程序 第一种解决方法是检查系统文件和驱动程序是否出现问题,以及是否需要更新。具体步骤如下: 进入Win11的“设置”界面。 点击“更新和安全”选项。 点击“还原”选项。 点击“开始”按钮,然后按照提示操作。 示例:用户小张遇到了Win11重启转圈圈…

    other 2023年6月27日
    00
  • QT开发应用程序的欢迎界面实例

    非常高兴能为你讲解“QT开发应用程序的欢迎界面实例”的完整攻略。 开发应用程序时,欢迎界面是非常重要的。它是用户界面的第一印象,可以吸引用户的注意力,提高用户体验。本攻略将向你展示如何使用QT框架创建一个漂亮的欢迎界面。 一、创建项目 1.新建一个QtWidgets应用程序项目。 2.在新项目向导的“项目配置”页面,勾选“创建欢迎界面”选项,并指定其为“Ma…

    other 2023年6月25日
    00
合作推广
合作推广
分享本页
返回顶部