完整攻略:Flink部署集群整体架构源码分析
简介
Apache Flink是一款开源的流式数据处理引擎,能够实现高效、准确、低延迟的数据处理和分析。本文将深入分析Flink的部署集群整体架构源码,为读者提供全面的技术指南。
部署集群整体架构源码分析
架构概述
Flink的整体架构可分为三层:Client、JobManager、TaskManager。其中,Client作为前端,用于提交任务;JobManager作为控制中心,负责任务调度、状态管理等;TaskManager负责实际的数据处理任务。
Client与JobManager通过RPC进行通讯,JobManager与TaskManager之间也是通过RPC进行通讯。JobManager会周期性地心跳TaskManager,以确保TaskManager在线。
在整个架构中,由于TaskManager与Data Sources之间存在网络延迟等问题,引入了一些额外的组件:Data Sinks、Buffer、Network。
代码分析
先来看Flink的整体启动流程:
public static void main(String[] args) throws Exception {
// ...
Configuration configuration = ...
// 根据conf config创建一个CliFrontend对象
final CliFrontend cli = new CliFrontend(configuration);
// 调用CliFrontend的run方法启动CLI咨询工具
int retCode = SecurityUtils.getInstalledContextClassLoader().run(
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return cli.run(args);
}
}
);
// ...
}
CliFrontend类是前端的实现,它将任务提交给JobManager。
CliFrontend启动后,会通过JobClient将任务提交到JobManager:
// 创建JobGraph
JobGraph jobGraph = ...
// 通过JobClient提交JobGraph
JobClient jobClient = JobClient.submitJobAndWaitForResult(clusterClient, jobGraph, false);
JobClient.submitJobAndWaitForResult()方法用于向JobManager提交任务,并等待任务完成。
在JobClient.startJobExecution()方法中,会先向JobManager发送一个JobSubmissionMessage:
private JobSubmissionResult startJobExecution(
JobGraph jobGraph,
boolean detached) throws Exception {
ExecutionConfigAccessor executionConfigAccessor = new ExecutionConfigAccessor(jobGraph.getExecutionConfig());
JobSubmissionMessage jobSubmissionMessage = new JobSubmissionMessage(
jobGraph,
executionConfigAccessor.getCheckpointConfig(),
executionConfigAccessor.getSavepointRestoreSettings(),
uploadedJarFileBlobKeys,
detached);
final CompletableFuture<Acknowledge> submissionResultFuture = jobManagerGateway.submitJob(jobSubmissionMessage, timeout);
...
}
JobManager收到JobSubmissionMessage之后,会在JobMasterGateway.onJobSubmitted()方法中初始化任务:
@Override
public CompletableFuture<Acknowledge> submitJob(final JobSubmissionMessage jobSubmissionMessage, final Time timeout) {
try {
if (jobSubmissionMessage.getUserCodeBlobKeys().length > 0) { // User Code
Future<Iterable<Path>> uploadResultFuture = userJobClassLoader.uploadUserArtifacts(userCodeBlobKeys);
uploadResultFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
return executeMischa(() -> {
JobGraph jobGraph = jobSubmissionMessage.getJobGraph();
jobGraph.setAllowQueuedScheduling(jobSubmissionMessage.getDetachedMode());
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = new CompletableFuture<>();
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
classLoaderLeaseRetriever.retrieveClassLoaderLease(jobSubmissionMessage.getUserCodeBlobKeys()).thenAccept(classLoader -> {
UserCodeClassLoaderFactory.DelegationResource delegationResource = new UserCodeClassLoaderFactory.DelegationResource(classLoader, jobGraph.getClasspaths(), jobGraph.getGlobalClasspaths(), jobGraph.getClassLoaderResolveOrder(), jobGraph.getUserClassLoaderParentFirstPatterns());
classLoaderUserCodes.put(jobId, delegationResource);
if (slotPoolContainer == null) {
slotPoolContainer = slotPoolService.createSlotPool(jobId);
slotPoolOwner = slotPoolContainer.getSlotPoolOwner();
}
final JobMaster jobMaster = jobManagerJobMetricGroup.<JobMaster>addGroup("JobMaster")
.closeable(() -> {
jobManagerMetricGroup.close();
classLoaderLeaseRetriever.closeAsync();
classLoaderUserCodes.remove(jobId);
ExceptionUtils.suppressExceptions(() -> slotPoolContainer.close());
})
.getIOMetricGroup()
.map(ioMetricGroup -> jobManagerServices.createJobMaster(
jobGraph,
ioExecutor,
slotPoolContainer,
leaderElectionService,
checkpointer,
classLoader,
delegationResource.getClasspaths(),
delegationResource.getGlobalClasspaths(),
delegationResource.getClassLoaderResolveOrder(),
delegationResource.getUserClassLoaderParentFirstPatterns(),
blobServer,
eventPublisher,
rpcTimeout,
ioExecutor.getCheckpointRecoveryFactory(),
schedulerOperatorFactory,
ioMetricGroup,
isTimeTrackingMetricsEnabled(),
gatewayRetriever,
partitionTrackerFactory,
partitionCoordinator,
accumulatorProvider,
expireDeploymentsListeners));
CompletableFuture<Void> jobManagerTerminationFuture = new CompletableFuture<>();
ExceptionUtils.runJobAsync(() -> jobManager.runJobMaster(jobMaster, jobManagerTerminationFuture), "JobManager (master)", LOG)
.whenCompleteAsync(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
jobManagerTerminationFuture.completeExceptionally(throwable);
} else {
jobManagerTerminationFuture.complete(null);
}
},
getMainThreadExecutor());
jobManagerTerminationFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
if (throwable instanceof FlinkException) {
ExceptionUtils.rethrowException(throwable, "JobManager failed to properly terminate.");
} else if (throwable != null) {
LOG.error("JobManager failed with "+throwable.getMessage(), throwable);
try {
getSelfGateway(JobMasterGateway.class).closeAsync();
} catch (Throwable t) {
LOG.warn("Error closing JobManager after initialization failure.", t);
}
acknowledgeCompletableFuture.completeExceptionally(throwable);
} else {
acknowledgeCompletableFuture.complete(Acknowledge.get());
}
});
});
return acknowledgeCompletableFuture;
});
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
return null; // silence the compiler
}
}
在JobMaster中,JobGraph会被拆分成多个ExecutionGraph,每个ExecutionGraph都会对应一个TaskManager。ExecutionGraph被拆分成多个Subtask,每个Subtask对应一个Task运行在TaskManager中。
若TaskManager在运行过程中出现了异常,会向对应的JobManager发送一个FailTaskExecutionAttempt消息,以便重新执行该任务。
示例1
以下是一个简单的Flink程序,用于统计输入数据中每个单词出现的次数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.socketTextStream("localhost", 9000);
DataStream<Tuple2<String, Integer>> counts = lines
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
})
.keyBy(0)
.sum(1);
counts.print();
env.execute("Word Count");
该程序会从TCP流中读取文本数据,分词后统计每个单词出现的次数。
示例2
下面是一个更加复杂的Flink程序,用于在线实时推荐:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserAction> rawUserActions = env.addSource(new UserActionSource());
DataStream<UserEvent> userEvents = rawUserActions
.filter(new FilterFunction<UserAction>() {
@Override
public boolean filter(UserAction action) {
return action.getType() == UserActionType.PAGE_VIEW;
}
})
.map(new MapFunction<UserAction, UserEvent>() {
@Override
public UserEvent map(UserAction action) {
return new UserEvent(action.getUserId(), action.getItemId(), action.getTimestamp())
}
});
DataStream<ItemEvent> itemEvents = rawUserActions
.filter(new FilterFunction<UserAction>() {
@Override
public boolean filter(UserAction action) {
return action.getType() == UserActionType.ITEM_UPDATE;
}
})
.map(new MapFunction<UserAction, ItemEvent>() {
@Override
public ItemEvent map(UserAction action) {
return new ItemEvent(action.getItemId(), action.getTimestamp());
}
});
DataStream<Tuple2<Integer, String>> recommendations = userEvents
.keyBy(new KeySelector<UserEvent, Integer>() {
@Override
public Integer getKey(UserEvent event) {
return event.getUserId();
}
})
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.apply(new RecommendationFunction(itemEvents));
recommendations.print();
env.execute("Online Real-time Recommendation");
该程序会从UserActionSource输入用户行为,包括PAGE_VIEW和ITEM_UPDATE。每当有新的PAGE_VIEW事件时,程序会尝试根据过去5分钟内的用户行为,推荐适合该用户的ITEM_UPDATE事件。最终,对于每个用户,程序会输出最新一条推荐的ITEM_UPDATE事件。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Flink部署集群整体架构源码分析 - Python技术站