hadoop2.7之作业提交详解(下)
发布日期:2021-05-09 09:32:51 浏览次数:15 分类:博客文章

本文共 18701 字,大约阅读时间需要 62 分钟。

���������������������������������������������������������������������������������YARNRunner.submitJob()

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]

���������������������YARNRunner.submitJob()������������

������������������YARNRunner���������������������������������

package org.apache.hadoop.mapred;public class YARNRunner implements ClientProtocol {  private ResourceMgrDelegate resMgrDelegate; //������RM������������������������������������  private ClientCache clientCache;  private Configuration conf;  private final FileContext defaultFileContext;    public YARNRunner(Configuration conf) {//������������������������������������,���������������������������������   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));  }  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {//������������ClientCache   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));  }  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,      ClientCache clientCache) {//���������������������������    this.conf = conf;    try {      this.resMgrDelegate = resMgrDelegate;      this.clientCache = clientCache;      this.defaultFileContext = FileContext.getFileContext(this.conf);    } catch (UnsupportedFileSystemException ufe) {      throw new RuntimeException("Error in instantiating YarnClient", ufe);    }  }public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws IOException, InterruptedException {    addHistoryToken(ts);//���������������������������,������������������(JobHistory)���������    // Construct necessary information to start the MR AM  //������MR AM���������������������  //������������ApplicationSubmissionContext,������conf������������������������������  ApplicationSubmissionContext appContext =    createApplicationSubmissionContext(conf, jobSubmitDir, ts);  // Submit to ResourceManager  try { /* ���������������������������������(ResourceManager)*///RM���������������������������������,������������ContainerLaunchContext���������������NM������//���,���������������������shell���������,������������Java���������,������������MRAppMaster.class���//������������,������ApplicationSubmissionContext������appContext,���������������������ResourceManager//���������������������ApplicationMaster������������������������    ApplicationId applicationId =        resMgrDelegate.submitApplication(appContext);    ApplicationReport appMaster = resMgrDelegate        .getApplicationReport(applicationId);    String diagnostics =        (appMaster == null ?            "application report is null" : appMaster.getDiagnostics());    if (appMaster == null        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {      throw new IOException("Failed to run job : " +          diagnostics);    }    return clientCache.getClient(jobId).getJobStatus(jobId);  } catch (YarnException e) {    throw new IOException(e);  }}}

������createApplicationSubmissionContext������������������

1���������������������������������1536M���cpu���core���1
2���������������������������������������������������jar������
3���������������������tokens
4���������������AM���������
5���������map���reduce���������������
6���������������CLASSPATH���
7������AM���container������ContainerLaunchContext
8���������ApplicationSubmissionContext
9���������MRAppMaster���������������
���������������conf������������������������������������������������������������������������������������������������������������������������������������������������ApplicationMaster������������������������������������Shell(������bash)���������������������������������������������������������������������

���������������������ResourceMgrDelegate.submitApplication������������������������������������ResourceMgrDelegate������������

public class ResourceMgrDelegate extends YarnClient {     private YarnConfiguration conf;  private ApplicationSubmissionContext application;  private ApplicationId applicationId;  protected YarnClient client;//������������YarnClientImpl������������,������������YarnClient������������������  private Text rmDTService;  //������ResourceMgrDelegate���������������  public ResourceMgrDelegate(YarnConfiguration conf) {    super(ResourceMgrDelegate.class.getName());    this.conf = conf;  //������YarnClient������client  //YarnClient.createYarnClient()������������YarnClientImpl    this.client = YarnClient.createYarnClient();    init(conf);//���������AbstractService������������,YarnClient������AbstractService���������    start();//������������AbstractService������������  }public ApplicationId      submitApplication(ApplicationSubmissionContext appContext)          throws YarnException, IOException {    return client.submitApplication(appContext);//������YarnClientImpl.submitApplication������  }

������������������������������������������������

ResourceMgrDelegate������������YARNRunner���������������������������������YARNRunner,������������������Cluster.Initialize()������������������������������,���Cluster���������������������������connect()���������������������,������������������,���������������������connect(),������������������������������������,���������������������Cluster���������,������������������YARNRunner������,���������������ResourceMgrDelegate������,������������������������������YarnClientImpl���������

������������������������������������������������

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication()]

������������������������YarnClientImpl.submitApplication()���������

public ApplicationId      submitApplication(ApplicationSubmissionContext appContext)          throws YarnException, IOException {    ApplicationId applicationId = appContext.getApplicationId();    if (applicationId == null) {      throw new ApplicationIdNotProvidedException(          "ApplicationId is not provided in ApplicationSubmissionContext");    }    //������������SubmitApplicationRequestPBImpl���������������    SubmitApplicationRequest request =        Records.newRecord(SubmitApplicationRequest.class);    request.setApplicationSubmissionContext(appContext);//������������������������Context    // Automatically add the timeline DT into the CLC    // Only when the security and the timeline service are both enabled    if (isSecurityEnabled() && timelineServiceEnabled) {      addTimelineDelegationToken(appContext.getAMContainerSpec());    }    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.    rmClient.submitApplication(request);//������������������������    int pollCount = 0;    long startTime = System.currentTimeMillis();    EnumSet
waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet
failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { //������������RM���������������������������,������������������������������������ ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break;//���������������������������,������while������ } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { String msg = "Interrupted while waiting for application " + applicationId + " to be successfully submitted."; LOG.error(msg); throw new YarnException(msg, ie); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request);//������������������������ } } return applicationId; }

������������������������������rmClient.submitApplication(request)������������������rmClient���������������������������������������������YarnClientImpl���������������������������

public class YarnClientImpl extends YarnClient {  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);  protected ApplicationClientProtocol rmClient;  protected long submitPollIntervalMillis;  private long asyncApiPollIntervalMillis;  private long asyncApiPollTimeoutMillis;  protected AHSClient historyClient;  private boolean historyServiceEnabled;  protected TimelineClient timelineClient;  @VisibleForTesting  Text timelineService;  @VisibleForTesting  String timelineDTRenewer;  protected boolean timelineServiceEnabled;  protected boolean timelineServiceBestEffort;  private static final String ROOT = "root";  public YarnClientImpl() {    super(YarnClientImpl.class.getName());  }

������������������rmClient���������ApplicationClientProtocol���������������������������������������������������������ApplicationClientProtocolPBClientImpl ���������������������������������������

public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol,    Closeable {  private ApplicationClientProtocolPB proxy;  public ApplicationClientProtocolPBClientImpl(long clientVersion,      InetSocketAddress addr, Configuration conf) throws IOException {//���������������rpc.engine.ApplicationClientProtocolPB������������ProtobufRpcEngine     RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,      ProtobufRpcEngine.class);//������proxy//������proxy���������������������������������������������������������JVM���,���������������//ResourceManager,������������NodeManager,���������������������Java���������,������������������//���������������������������    proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);  }    public SubmitApplicationResponse submitApplication(    SubmitApplicationRequest request) throws YarnException,    IOException {//���������request������������������������(message)������   SubmitApplicationRequestProto requestProto =      ((SubmitApplicationRequestPBImpl) request).getProto();  try {//������proxy���������������������,������������������������ //���������������������������SubmitApplicationResponsePBImpl������     return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));  } catch (ServiceException e) {    RPCUtil.unwrapAndThrowException(e);    return null;  }} }

ApplicationClientProtocolPBClientImpl���submitApplication���������������������������������proxy.submitApplication������������proxy���������������������������������

������������proxy���������SubmitApplicationRequest,������RM������������������,���������������������������������������������������TCP���������������������RM������������������������������,������������

ProtoBuf,���������TCP������������������������������������������������������,���������������������������ApplicationClientProtocolPB���������ApplicationClientProtocolPBServiceImpl,ProtoBuf������
������������������������������������������submitApplication()���������,Client������������ApplicationClientProtocolPBClientImpl������������������������������������Server������������applicationClientProtocolPBServiceImpl������������������������������������������,Server���������������������������������������������Client������������������,���������������������������������RPC���������������,Client/Server���������������������������������������������������������������,���������������ApplicationClientProtocolPB���

Client���

YARNRunner.submitJob() //������������������������������
ResourceMgrDelegate.submitApplication() //������RM���������
YarnClientImpl.submitApplication() //YARN���������Client������
ApplicationClientProtocolPBClientImpl.submitApplication()//ApplicationClientProtocol������
proxy.submitApplication() //ApplicationClientProtocolPB������
Protocol���������������submitApplication() //���TCP/IP������������������������������������
Socket���TCP/IP //������������������������������

 

Server������

Server���������������������������Server���������,���������������������������������������������������,���������������������������Socket���TCP/IP���������������������������������������,���������������������������������������������������������������������TCP/IP������������������������������,������������������������������������������������������������������������,������������������������������������������������������������������������������

���������������������������tcp/ip���������������ApplicationClientProtocolPBServiceImpl.submitApplication()���������

public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB {  private ApplicationClientProtocol real;    public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {    this.real = impl;  }  public SubmitApplicationResponseProto submitApplication(RpcController arg0,    SubmitApplicationRequestProto proto) throws ServiceException {  SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);//������������������  try {    SubmitApplicationResponse response = real.submitApplication(request); //real���ClientRMService��������� ���������������RM���������������createClientRMService() ������������    return ((SubmitApplicationResponsePBImpl)response).getProto();  } catch (YarnException e) {    throw new ServiceException(e);  } catch (IOException e) {    throw new ServiceException(e);  }}}

���������������ClientRMService.submitApplication(request); ������

public SubmitApplicationResponse submitApplication(    SubmitApplicationRequest request) throws YarnException {  ApplicationSubmissionContext submissionContext = request      .getApplicationSubmissionContext();  ApplicationId applicationId = submissionContext.getApplicationId();  // ApplicationSubmissionContext needs to be validated for safety - only  // those fields that are independent of the RM's configuration will be  // checked here, those that are dependent on RM configuration are validated  // in RMAppManager.  String user = null;  try {    // Safety    user = UserGroupInformation.getCurrentUser().getShortUserName();//������������  } catch (IOException ie) {    LOG.warn("Unable to get the current user.", ie);    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,        ie.getMessage(), "ClientRMService",        "Exception in submitting application", applicationId);    throw RPCUtil.getRemoteException(ie);  }  // Check whether app has already been put into rmContext,  // If it is, simply return the response//���������������������������������������������������������������  if (rmContext.getRMApps().get(applicationId) != null) {    LOG.info("This is an earlier submitted application: " + applicationId);    return SubmitApplicationResponse.newInstance();  }  //������������������������������������������������   if (submissionContext.getQueue() == null) {     submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);  }  //������������������application���������������������������������������  if (submissionContext.getApplicationName() == null) {    submissionContext.setApplicationName(        YarnConfiguration.DEFAULT_APPLICATION_NAME);  }  //���������������������������������������������������yarn������  if (submissionContext.getApplicationType() == null) {    submissionContext      .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);  } else {    if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {      submissionContext.setApplicationType(submissionContext        .getApplicationType().substring(0,          YarnConfiguration.APPLICATION_TYPE_LENGTH));    }  }  try {    // call RMAppManager to submit application directly    rmAppManager.submitApplication(submissionContext,        System.currentTimeMillis(), user);//���������������rmAppManager������    LOG.info("Application with id " + applicationId.getId() +         " submitted by user " + user);    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,        "ClientRMService", applicationId);  } catch (YarnException e) {    LOG.info("Exception in submitting application with id " +        applicationId.getId(), e);    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,        e.getMessage(), "ClientRMService",        "Exception in submitting application", applicationId);    throw e;  }  SubmitApplicationResponse response = recordFactory      .newRecordInstance(SubmitApplicationResponse.class);  return response;}

���������������������������,��������������� RM ������������RMAppManagers. ubmitApplication(),������������������������������ ������������������������,������ RM������������������������������������������������

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() -> ApplicationClientProtocolPBClientImpl.submitApplication() -> ApplicationClientProtocolPBServiceImpl.submitApplication() -> ClientRMService.submitApplication() -> RMAppManager.submitApplication() ]

上一篇:hadoop2.7作业提交详解之文件分片
下一篇:linux文件系统详解

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年04月19日 19时51分20秒