
本文共 18701 字,大约阅读时间需要 62 分钟。
���������������������������������������������������������������������������������YARNRunner.submitJob()
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]
���������������������YARNRunner.submitJob()������������
������������������YARNRunner���������������������������������
package org.apache.hadoop.mapred;public class YARNRunner implements ClientProtocol { private ResourceMgrDelegate resMgrDelegate; //������RM������������������������������������ private ClientCache clientCache; private Configuration conf; private final FileContext defaultFileContext; public YARNRunner(Configuration conf) {//������������������������������������,��������������������������������� this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); } public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {//������������ClientCache this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate)); } public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) {//��������������������������� this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); } }public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException { addHistoryToken(ts);//���������������������������,������������������(JobHistory)��������� // Construct necessary information to start the MR AM //������MR AM��������������������� //������������ApplicationSubmissionContext,������conf������������������������������ ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { /* ���������������������������������(ResourceManager)*///RM���������������������������������,������������ContainerLaunchContext���������������NM������//���,���������������������shell���������,������������Java���������,������������MRAppMaster.class���//������������,������ApplicationSubmissionContext������appContext,���������������������ResourceManager//���������������������ApplicationMaster������������������������ ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); }}}
������createApplicationSubmissionContext������������������
1���������������������������������1536M���cpu���core���12���������������������������������������������������jar������3���������������������tokens4���������������AM���������5���������map���reduce���������������6���������������CLASSPATH���7������AM���container������ContainerLaunchContext8���������ApplicationSubmissionContext9���������MRAppMaster������������������������������conf������������������������������������������������������������������������������������������������������������������������������������������������ApplicationMaster������������������������������������Shell(������bash)������������������������������������������������������������������������������������������ResourceMgrDelegate.submitApplication������������������������������������ResourceMgrDelegate������������
public class ResourceMgrDelegate extends YarnClient { private YarnConfiguration conf; private ApplicationSubmissionContext application; private ApplicationId applicationId; protected YarnClient client;//������������YarnClientImpl������������,������������YarnClient������������������ private Text rmDTService; //������ResourceMgrDelegate��������������� public ResourceMgrDelegate(YarnConfiguration conf) { super(ResourceMgrDelegate.class.getName()); this.conf = conf; //������YarnClient������client //YarnClient.createYarnClient()������������YarnClientImpl this.client = YarnClient.createYarnClient(); init(conf);//���������AbstractService������������,YarnClient������AbstractService��������� start();//������������AbstractService������������ }public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { return client.submitApplication(appContext);//������YarnClientImpl.submitApplication������ }
������������������������������������������������
ResourceMgrDelegate������������YARNRunner���������������������������������YARNRunner,������������������Cluster.Initialize()������������������������������,���Cluster���������������������������connect()���������������������,������������������,���������������������connect(),������������������������������������,���������������������Cluster���������,������������������YARNRunner������,���������������ResourceMgrDelegate������,������������������������������YarnClientImpl���������
������������������������������������������������
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication()]
������������������������YarnClientImpl.submitApplication()���������
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } //������������SubmitApplicationRequestPBImpl��������������� SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext);//������������������������Context // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request);//������������������������ int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSetwaitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { //������������RM���������������������������,������������������������������������ ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break;//���������������������������,������while������ } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { String msg = "Interrupted while waiting for application " + applicationId + " to be successfully submitted."; LOG.error(msg); throw new YarnException(msg, ie); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request);//������������������������ } } return applicationId; }
������������������������������rmClient.submitApplication(request)������������������rmClient���������������������������������������������YarnClientImpl���������������������������
public class YarnClientImpl extends YarnClient { private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); protected ApplicationClientProtocol rmClient; protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; private long asyncApiPollTimeoutMillis; protected AHSClient historyClient; private boolean historyServiceEnabled; protected TimelineClient timelineClient; @VisibleForTesting Text timelineService; @VisibleForTesting String timelineDTRenewer; protected boolean timelineServiceEnabled; protected boolean timelineServiceBestEffort; private static final String ROOT = "root"; public YarnClientImpl() { super(YarnClientImpl.class.getName()); }
������������������rmClient���������ApplicationClientProtocol���������������������������������������������������������ApplicationClientProtocolPBClientImpl ���������������������������������������
public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol, Closeable { private ApplicationClientProtocolPB proxy; public ApplicationClientProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {//���������������rpc.engine.ApplicationClientProtocolPB������������ProtobufRpcEngine RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class, ProtobufRpcEngine.class);//������proxy//������proxy���������������������������������������������������������JVM���,���������������//ResourceManager,������������NodeManager,���������������������Java���������,������������������//��������������������������� proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf); } public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException {//���������request������������������������(message)������ SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl) request).getProto(); try {//������proxy���������������������,������������������������ //���������������������������SubmitApplicationResponsePBImpl������ return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); return null; }} }
ApplicationClientProtocolPBClientImpl���submitApplication���������������������������������proxy.submitApplication������������proxy���������������������������������
������������proxy���������SubmitApplicationRequest,������RM������������������,���������������������������������������������������TCP���������������������RM������������������������������,������������
ProtoBuf,���������TCP������������������������������������������������������,���������������������������ApplicationClientProtocolPB���������ApplicationClientProtocolPBServiceImpl,ProtoBuf������������������������������������������������submitApplication()���������,Client������������ApplicationClientProtocolPBClientImpl������������������������������������Server������������applicationClientProtocolPBServiceImpl������������������������������������������,Server���������������������������������������������Client������������������,���������������������������������RPC���������������,Client/Server���������������������������������������������������������������,���������������ApplicationClientProtocolPB���Client���
YARNRunner.submitJob() //������������������������������ResourceMgrDelegate.submitApplication() //������RM���������YarnClientImpl.submitApplication() //YARN���������Client������ApplicationClientProtocolPBClientImpl.submitApplication()//ApplicationClientProtocol������proxy.submitApplication() //ApplicationClientProtocolPB������Protocol���������������submitApplication() //���TCP/IP������������������������������������Socket���TCP/IP //������������������������������
Server������
Server���������������������������Server���������,���������������������������������������������������,���������������������������Socket���TCP/IP���������������������������������������,���������������������������������������������������������������������TCP/IP������������������������������,������������������������������������������������������������������������,���������������������������������������������������������������������������������������������������������tcp/ip���������������ApplicationClientProtocolPBServiceImpl.submitApplication()���������
public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB { private ApplicationClientProtocol real; public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) { this.real = impl; } public SubmitApplicationResponseProto submitApplication(RpcController arg0, SubmitApplicationRequestProto proto) throws ServiceException { SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);//������������������ try { SubmitApplicationResponse response = real.submitApplication(request); //real���ClientRMService��������� ���������������RM���������������createClientRMService() ������������ return ((SubmitApplicationResponsePBImpl)response).getProto(); } catch (YarnException e) { throw new ServiceException(e); } catch (IOException e) { throw new ServiceException(e); }}}
���������������ClientRMService.submitApplication(request); ������
public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName();//������������ } catch (IOException ie) { LOG.warn("Unable to get the current user.", ie); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, ie.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw RPCUtil.getRemoteException(ie); } // Check whether app has already been put into rmContext, // If it is, simply return the response//��������������������������������������������������������������� if (rmContext.getRMApps().get(applicationId) != null) { LOG.info("This is an earlier submitted application: " + applicationId); return SubmitApplicationResponse.newInstance(); } //������������������������������������������������ if (submissionContext.getQueue() == null) { submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } //������������������application��������������������������������������� if (submissionContext.getApplicationName() == null) { submissionContext.setApplicationName( YarnConfiguration.DEFAULT_APPLICATION_NAME); } //���������������������������������������������������yarn������ if (submissionContext.getApplicationType() == null) { submissionContext .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); } else { if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { submissionContext.setApplicationType(submissionContext .getApplicationType().substring(0, YarnConfiguration.APPLICATION_TYPE_LENGTH)); } } try { // call RMAppManager to submit application directly rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user);//���������������rmAppManager������ LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); } catch (YarnException e) { LOG.info("Exception in submitting application with id " + applicationId.getId(), e); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw e; } SubmitApplicationResponse response = recordFactory .newRecordInstance(SubmitApplicationResponse.class); return response;}
���������������������������,��������������� RM ������������RMAppManagers. ubmitApplication(),������������������������������ ������������������������,������ RM������������������������������������������������
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() -> ApplicationClientProtocolPBClientImpl.submitApplication() -> ApplicationClientProtocolPBServiceImpl.submitApplication() -> ClientRMService.submitApplication() -> RMAppManager.submitApplication() ]
发表评论
最新留言
关于作者
