
本文共 20107 字,大约阅读时间需要 67 分钟。
flink分析使用之四Job的启动
一、介绍
在老的flink版本中,工作是由Jobmanager(低版本)和JobMaster来管理的,Flink通过JobManager实现Client和TaskManager交互协作,Client将JobGraph提交给JobManager,利用JobGraph实现ExecutionGraph,然后分发到TaskManager上执行。从 flip-6起,开始通过使用JobMaster,以Flink Dispatcher分发到JobManagerRunner将JobGraph发给JobMaster,JobMaster再将JobGraph实现万ExecutionGraph,传递给TaskManager工作。
二、脚本的启动
,其中的相关代码如下:
if [[ $STARTSTOP == "start-foreground" ]]; then exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"else "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"fi
这个在前面分析过,根据不同的参数,来启动是前端交互式控制台还是守护进程,它们的内容基本类似,以守护进程为例:
\case $DAEMON in ...... (standalonesession) CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;; (standalonejob) CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint ;;
由此可以看出它会调用org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint这个类,那么进入这个类看一看,到底是何方神圣:
/** * {@link JobClusterEntrypoint} which is started with a job in a predefined * location. */public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { @Nonnull private final JobID jobId; @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; @Nonnull private final String[] programArguments; @Nullable private final String jobClassName; private StandaloneJobClusterEntryPoint( Configuration configuration, @Nonnull JobID jobId, @Nonnull SavepointRestoreSettings savepointRestoreSettings, @Nonnull String[] programArguments, @Nullable String jobClassName) { super(configuration); this.jobId = requireNonNull(jobId, "jobId"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.programArguments = requireNonNull(programArguments, "programArguments"); this.jobClassName = jobClassName; } @Override protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { return new JobDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE, new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName)); } public static void main(String[] args) { // startup checks and logging ...... StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint( configuration, clusterConfiguration.getJobId(), clusterConfiguration.getSavepointRestoreSettings(), clusterConfiguration.getArgs(), clusterConfiguration.getJobClassName()); ClusterEntrypoint.runClusterEntrypoint(entrypoint); }}private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { initializeServices(configuration);//这里会调用下面的haServices的创建 // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); //此处调用StandaloneJobClusterEntryPoint中的此函数,在相关的StandaloneSessionClusterEntrypoint中也有类似的函数 final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); }}
可见他和前面介绍的StandaloneSessionClusterEntrypoint启动的最终都是启动runClusterEntrypoint,最终启动runCluster,在函数中,启动了大量的服务和相关的资源的启动(如线程池等),在其中启动了一个服务haServices,而这个服务里的内容包括:
public interface HighAvailabilityServices extends AutoCloseable { // ------------------------------------------------------------------------ // Constants // ------------------------------------------------------------------------ /** * This UUID should be used when no proper leader election happens, but a simple * pre-configured leader is used. That is for example the case in non-highly-available * standalone setups. */ UUID DEFAULT_LEADER_ID = new UUID(0, 0); /** * This JobID should be used to identify the old JobManager when using the * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a * distinct JobID assigned. */ JobID DEFAULT_JOB_ID = new JobID(0L, 0L); // ------------------------------------------------------------------------ // Services // ------------------------------------------------------------------------ /** * Gets the leader retriever for the cluster's resource manager. */ LeaderRetrievalService getResourceManagerLeaderRetriever(); /** * Gets the leader retriever for the dispatcher. This leader retrieval service * is not always accessible. */ LeaderRetrievalService getDispatcherLeaderRetriever(); /** * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. * @return Leader retrieval service to retrieve the job manager for the given job * @deprecated This method should only be used by the legacy code where the JobManager acts as the master. */ @Deprecated LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID); /** * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. * @param defaultJobManagerAddress JobManager address which will be returned by * a static leader retrieval service. * @return Leader retrieval service to retrieve the job manager for the given job */ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress); LeaderRetrievalService getWebMonitorLeaderRetriever(); /** * Gets the leader election service for the cluster's resource manager. * * @return Leader election service for the resource manager leader election */ LeaderElectionService getResourceManagerLeaderElectionService(); /** * Gets the leader election service for the cluster's dispatcher. * * @return Leader election service for the dispatcher leader election */ LeaderElectionService getDispatcherLeaderElectionService(); /** * Gets the leader election service for the given job. * * @param jobID The identifier of the job running the election. * @return Leader election service for the job manager leader election */ LeaderElectionService getJobManagerLeaderElectionService(JobID jobID); LeaderElectionService getWebMonitorLeaderElectionService(); /** * Gets the checkpoint recovery factory for the job manager * * @return Checkpoint recovery factory */ CheckpointRecoveryFactory getCheckpointRecoveryFactory(); /** * Gets the submitted job graph store for the job manager * * @return Submitted job graph store * @throws Exception if the submitted job graph store could not be created */ SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; /** * Gets the registry that holds information about whether jobs are currently running. * * @return Running job registry to retrieve running jobs */ RunningJobsRegistry getRunningJobsRegistry() throws Exception; /** * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. * * @return Blob store * @throws IOException if the blob store could not be created */ BlobStore createBlobStore() throws IOException;......}
从内容来看主要是作业和Leader选举相关的多。所以可以知道作业应该就是在这里启动的。
三、JobManager启动
开始说了,作业是通过Dispatcher来分发的。看一下Create相关:
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { return new JobDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE, new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName));}public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> { INSTANCE; @Override public ResourceManager<ResourceID> createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, JobManagerMetricGroup jobManagerMetricGroup) throws Exception { final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( resourceManagerRuntimeServicesConfiguration, highAvailabilityServices, rpcService.getScheduledExecutor()); return new StandaloneResourceManager( rpcService, getEndpointId(), resourceId, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, jobManagerMetricGroup); }}//下面有说明public class JobDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<MiniDispatcher, RestfulGateway> { public JobDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory, @Nonnull JobGraphRetriever jobGraphRetriever) { super(new JobDispatcherFactory(jobGraphRetriever), resourceManagerFactory, JobRestEndpointFactory.INSTANCE); } @Override protected DispatcherResourceManagerComponent<MiniDispatcher> createDispatcherResourceManagerComponent( MiniDispatcher dispatcher, ResourceManager<?> resourceManager, LeaderRetrievalService dispatcherLeaderRetrievalService, LeaderRetrievalService resourceManagerRetrievalService, WebMonitorEndpoint<?> webMonitorEndpoint, JobManagerMetricGroup jobManagerMetricGroup) { return new JobDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); }}//创建HaServicesprotected HighAvailabilityServices createHaServices( Configuration configuration, Executor executor) throws Exception { return HighAvailabilityServicesUtils.createHighAvailabilityServices( configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);}public static HighAvailabilityServices createHighAvailabilityServices( Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception { HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration); switch (highAvailabilityMode) { case NONE: final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration); final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl( hostnamePort.f0, hostnamePort.f1, JobMaster.JOB_MANAGER_NAME, addressResolution, configuration); final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl( hostnamePort.f0, hostnamePort.f1, ResourceManager.RESOURCE_MANAGER_NAME, addressResolution, configuration); final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl( hostnamePort.f0, hostnamePort.f1, Dispatcher.DISPATCHER_NAME, addressResolution, configuration); final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS), "%s must be set", RestOptions.ADDRESS.key()); final int port = configuration.getInteger(RestOptions.PORT); final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration); final String protocol = enableSSL ? "https://" : "http://"; return new StandaloneHaServices( resourceManagerRpcUrl, dispatcherRpcUrl, jobManagerRpcUrl, String.format("%s%s:%s", protocol, address, port)); case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService); case FACTORY_CLASS: return createCustomHAServices(configuration, executor); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); }}
抽丝剥茧会发现,分发器的创建出现了,有了它,作业就可以通过它来发送给任务来实现操作。下面看一下最终的Create:
public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> { private final Logger log = LoggerFactory.getLogger(getClass()); @Nonnull private final DispatcherFactory<T> dispatcherFactory; @Nonnull private final ResourceManagerFactory<?> resourceManagerFactory; @Nonnull private final RestEndpointFactory<U> restEndpointFactory; public AbstractDispatcherResourceManagerComponentFactory( @Nonnull DispatcherFactory<T> dispatcherFactory, @Nonnull ResourceManagerFactory<?> resourceManagerFactory, @Nonnull RestEndpointFactory<U> restEndpointFactory) { this.dispatcherFactory = dispatcherFactory; this.resourceManagerFactory = resourceManagerFactory; this.restEndpointFactory = restEndpointFactory; } //这里是实现的create这个函数,看看有多少个相关的服务 @Override public DispatcherResourceManagerComponent<T> create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; WebMonitorEndpoint<U> webMonitorEndpoint = null; ResourceManager<?> resourceManager = null; JobManagerMetricGroup jobManagerMetricGroup = null; T dispatcher = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L)); final ExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); final String hostname = getHostname(rpcService); jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), jobManagerMetricGroup); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); dispatcher = dispatcherFactory.createDispatcher( configuration, rpcService, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist); log.debug("Starting ResourceManager."); resourceManager.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); log.debug("Starting Dispatcher."); dispatcher.start(); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return createDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); } catch (Exception exception) { // clean up all started components if (dispatcherLeaderRetrievalService != null) { try { dispatcherLeaderRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } if (resourceManagerRetrievalService != null) { try { resourceManagerRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); if (webMonitorEndpoint != null) { terminationFutures.add(webMonitorEndpoint.closeAsync()); } if (resourceManager != null) { terminationFutures.add(resourceManager.closeAsync()); } if (dispatcher != null) { terminationFutures.add(dispatcher.closeAsync()); } final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures); try { terminationFuture.get(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (jobManagerMetricGroup != null) { jobManagerMetricGroup.close(); } throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception); } } protected String getHostname(RpcService rpcService) { final String rpcServiceAddress = rpcService.getAddress(); return rpcServiceAddress != null && rpcServiceAddress.isEmpty() ? "localhost" : rpcServiceAddress; } protected abstract DispatcherResourceManagerComponent<T> createDispatcherResourceManagerComponent( T dispatcher, ResourceManager<?> resourceManager, LeaderRetrievalService dispatcherLeaderRetrievalService, LeaderRetrievalService resourceManagerRetrievalService, WebMonitorEndpoint<?> webMonitorEndpoint, JobManagerMetricGroup jobManagerMetricGroup);}
四、分析
通过上面的代码流程可以看到其实整个作业的启动分成了两大部分:
1、各种资源的创建,主要在createDispatcherResourceManagerComponentFactory这个函数中实现,包括各种相关的接口实现。
2、在1基础上实现的具体的接口内容和相关的服务生成。
下一篇具体分析。
六、总结
写这一篇遇到了很多具体的细节的问题,特别是工作的各个环节的关系,还是要仔细画图来分析,下篇会对此进行详细的说明。
发表评论
最新留言
关于作者
