hadoop2.7之作业提交详解(上)
发布日期:2021-05-09 09:32:51 浏览次数:18 分类:博客文章

本文共 27517 字,大约阅读时间需要 91 分钟。

������wordcount���������������

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * @author: LUGH1 * @date: 2019-4-8 * @description: */public class WordCount {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS","hdfs://192.168.88.130:9000");        Job job = Job.getInstance(conf);        job.setJarByClass(WordCount.class);        job.setMapperClass(WdMapper.class);        job.setReducerClass(WdReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.setInputPaths(job, new Path("/test/word.txt"));        FileOutputFormat.setOutputPath(job,  new Path("/test/output"));        boolean result = job.waitForCompletion(true);        System.exit(result?0:1);        System.out.println("good job");    }}class WdMapper extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); for(String word : split){ context.write(new Text(word), new IntWritable(1)); } }}class WdReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable i : values){ count += i.get(); } context.write(key,new IntWritable(count)); }}

���������������������wordcount���������������������������������������������������������main���������������������job������������������������������������������������������waitForCompletion������

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {   //....������������������.....   boolean result = job.waitForCompletion(true);  //���������Job������������������waitForCompletion()������������   System.exit(result?0:1); }

������������������������������������waitForCompletion������������������Job������������������������������������������������������������������������

public class Job extends JobContextImpl implements JobContext {                                                                                                                                                                                                                                                                                                                                                                                                                                                                  private static final Log LOG = LogFactory.getLog(Job.class);  public static enum JobState {DEFINE, RUNNING}; //������������������  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;  //������������2000������������������  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";  public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval";  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval";  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;  public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used";  public static final String SUBMIT_REPLICATION =  "mapreduce.client.submit.file.replication";  public static final int DEFAULT_SUBMIT_REPLICATION = 10;  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }  static {    ConfigUtil.loadResources();  //������������  }  private JobState state = JobState.DEFINE;  //���������������������������������������DEFINE������  private JobStatus status;  private long statustime;  private Cluster cluster;  private ReservationId reservationId;     boolean waitForCompletion(booleanverbose)  submit()  setUseNewAPI() connect() getJobSubmitter(FileSystemfs, ClientProtocolsubmitClient) isUber() //������������������������(MapTask���ReduceTask������������������) setPartitionerClass()//Mapper���������������������Partitioner������������������������������Reducer setMapSpeculativeExecution() //���������������Speculative���Mapper��������������������� setReduceSpeculativeExecution() //���������������Speculative���Reducer��������������������� setCacheFiles() }

���������Job���������������������������������������������������������������java���������������������������������������������������������������������������main���������������Job job = Job.getInstance(conf);������������������������������������������������������������������������������������������������������������������������������������������������job������������������DEFINE������������������������������������������������������������������������������

public static void loadResources() {    addDeprecatedKeys();    Configuration.addDefaultResource("mapred-default.xml");    Configuration.addDefaultResource("mapred-site.xml");    Configuration.addDefaultResource("yarn-default.xml");    Configuration.addDefaultResource("yarn-site.xml");  }

���������������������������������hadoop���������������������������������������������waitForCompletion������������������������������������������������������������������waitForCompletion���������

//org.apache.hadoop.mapreduce������Job���public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {if (state == JobState.DEFINE) {   //���������������������DEFINE���������������������������������    submit();  //������������ }  if (verbose) { //������������������������������������������������������monitorAndPrintJob();   //��������������������������������� } else {   //���������������������������������������������    // get the completion poll interval from the client.    int completionPollIntervalMillis =  Job.getCompletionPollInterval(cluster.getConf());    while (!isComplete()) {      try {       Thread.sleep(completionPollIntervalMillis);       } catch (InterruptedException ie) {      }    } }  return isSuccessful();}

������

���������������������������������������,���������������������������������������,���������������Job.submit()���������,��������������������������������������������������������� DEFINE ������,��������������������������������������������� ������������,JobState������������ DEFINE ��� RUNNING ������,������Job���������������������������������Job()������������������ DEFINE,��������������������������������������� RUNNING,������������������������

���������������������������,Job.submit() ������������������,������������������������������������������������������,������������������������������������������ ������,���Job.submit()������������,Job.waitForCompletion()������������������������������������������������������ ���������������,������������verbose���true,������������������������������������������,���������������������������������������������������������

���������������������������������������������

[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]

������������������������������������submit���������

public void submit()          throws IOException, InterruptedException, ClassNotFoundException {    ensureState(JobState.DEFINE); //������������������������DEFINE    setUseNewAPI(); //������������������������������������API������    connect(); //���������������������������Cluster���cluster������    final JobSubmitter submitter =         getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//������JobSubmitter���������������submitter     status = ugi.doAs(new PrivilegedExceptionAction
() { //ugi.doAs������������������ public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); //������������������������ } }); state = JobState.RUNNING; //������job������������RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }

���������������������connect���������

private synchronized void connect()          throws IOException, InterruptedException, ClassNotFoundException {    if (cluster == null) { //������cluter������������������������������cluster������      cluster =         ugi.doAs(new PrivilegedExceptionAction
() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); //������cluster } }); } }

������connect()������������������������������������Cluster���������,���������������,������������������ ���������������������Cluster���������������������������������

public class Cluster {  @InterfaceStability.Evolving  public static enum JobTrackerStatus {INITIALIZING, RUNNING}; //������������������  private ClientProtocolProvider clientProtocolProvider; //������������YarnClientProtocolProvider ������������������LocalClientProtocolProvider  private ClientProtocol client;  //������������������,���������������������������������������  private UserGroupInformation ugi; //������������������  private Configuration conf;  //������������  private FileSystem fs = null; //������������  private Path sysDir = null; //������������  private Path stagingAreaDir = null;   private Path jobHistoryDir = null; //������������������  private static final Log LOG = LogFactory.getLog(Cluster.class);//ServiceLoader
,������������//ClientProtocolProvider������ServiceLoader,���������������������ServiceLoaderl.oad()���������ServiceLoader���������Iterable������, //������������iterator()������,������������������for������������//���������������������load()������,������������ClassLoader������Class private static ServiceLoader
frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); //������������������ }//���������public Cluster(Configuration conf) throws IOException { this(null, conf);}//���������public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); //������initialize������}//������������������ClientProtocolProvider���ClientProtocolprivate void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //���������������������������������������������,������������ for (ClientProtocolProvider provider : frameworkLoader) { //������frameworkLoader������provider LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { //������ClientProtocolProvider���create������������clientProtocol clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; //���������������ClientProtocol������,YARNRunner���LocalJobRunner LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; //��������������������� } else { //��������������������� LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } if (null == clientProtocolProvider || null == client) { //���������������������ClientProtocolProvider���ClientProtocol������ throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); }}

������������������job������connect���������������������������cluster������������������������Cluster���������������������������������������������������������������������������ConfigUtil.loadResources()���������������������frameworkLoader���������������������������Cluster���������������������Cluster������������������������������Cluster.initialize()���������������ClientProtocolProvider���ClientProtocol������������RM������������������,������RM������������������,������RM������������������������������,������������������������������������������������,������������������������,������������������������,������������������������,������������������������Hadoop������������,���������Protocol������������������������������������������������������,���������������YARN���������������������������������������������ClientProtocol������������������������,���ClientProtocolProvider���������������ClientProtocol������������,������������������Factory������������

������ServiceLoader<ClientProtocolProvider>,������������������ClientProtocolProvider������

������������������������������ClientProtocolProvider������������������������������,���������������������������������������������������������������������������������������������

public abstract class ClientProtocolProvider {    public abstract ClientProtocol create(Configuration conf) throws IOException;    public abstract ClientProtocol create(InetSocketAddress addr,      Configuration conf) throws IOException;  public abstract void close(ClientProtocol clientProtocol) throws IOException;}

���������������������������������������������������YarnClientProtocolProvider���LocalClientProtocolProvider 

package org.apache.hadoop.mapred;public class YarnClientProtocolProvider extends ClientProtocolProvider {  @Override  public ClientProtocol create(Configuration conf) throws IOException {    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {     return new YARNRunner(conf); //YARNRunner���������ClientProtocol������    }    return null;  }  @Override   public ClientProtocol create(InetSocketAddress addr, Configuration conf)      throws IOException {    return create(conf);  }  @Override  public void close(ClientProtocol clientProtocol) throws IOException {    if (clientProtocol instanceof YARNRunner) {      ((YARNRunner)clientProtocol).close();    }  }
package org.apache.hadoop.mapred;public class LocalClientProtocolProvider extends ClientProtocolProvider {  @Override  public ClientProtocol create(Configuration conf) throws IOException {    String framework =        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {      return null;    }    conf.setInt(JobContext.NUM_MAPS, 1); //map������1    return new LocalJobRunner(conf); //LocalJobRunner���������ClientProtocol������  }  @Override  public ClientProtocol create(InetSocketAddress addr, Configuration conf) {    return null; // LocalJobRunner doesn't use a socket  }  @Override  public void close(ClientProtocol clientProtocol) {    // no clean up required  }

������������������������Cluster.initialize()���������

������������ServiceLoader���������Iterable������,������������iterator()������,������������������for���������������������������������load()������,������������ClassLoader������Class���������,������������������������������������������������������ServiceLoader���������frameworkLoader,���LinkedHashMap���������������������������������,������������������������iterator()������������������������������������

������������,���Cluster������������������������������������initialize(),������������������ClientProtocolProvider���ClientProtocol���

������������ClientProtocolProvider���������������,���������������������������������������������������������������������������������������������Hadoop������������������������������������������������������������������,���������LocalClientProtocolProvider���YarnClientProtocolProvide

 

������������������,������������ClientProtocolProvider���������ClientProtocol������������������������������ClientProtocol������������,���������������������������������������,���������LocalJobRunner���YARNRunner������������������������������������������������

������initialize���for������,���������������ServiceLoader���iterator()���������������������������������������ClientProtocolProvider���������,������������������ClientProtocolProvider.create()������������������������ClientProtocol,���������������LocalJobRunner���YARNRunner������������������������������,������������������������������,���������������������������;������,���������������������,������������������������,���������������������������RM������������������������������������������,���������������������������������������������ClientProtocolProvider������������,���������������������������������LocalClientProtocolProvider���YarnClientProtocolProvider���������������������������������������������������,������������������������������

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]

���������������������������������������������

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]

���������������������yarn������������������������������������������

������YarnClientProtocolProvider.create()���������������������������������new YARNRunner(conf)���������

������������������������������������Job.submit()������������������connect���������������������������������������������getJobSubmitter()������������ ������������������������JobSubmitter���������,������Jobs. ubmit()���������������submitJobInternal()������,������������������������������JobSubmitter������������������������������������getJobSubmitter()������������������,������cluster.getFileSystem()���cluster.getClient()��� ������cluster.getClient()��������������� YARNRunner���LocalJobRunner;���cluster.getFileSystem()��������������������� YARNRunner��� RM ������������������������ URL,������ LocalJobRunner���������������������������������������������mapred/system���������������

������������������������JobSubmitter������������������������������

 

package org.apache.hadoop.mapreduce;class JobSubmitter {  protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);  private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle������  private static final int SHUFFLE_KEY_LENGTH = 64;  private FileSystem jtFs;  private ClientProtocol submitClient;  private String submitHostName;  private String submitHostAddress;  JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)   throws IOException {    this.submitClient = submitClient; //���������������������YARNRunner     this.jtFs = submitFs;  }compareFs(FileSystemsrcFs, FileSystemdestFs) //������������������������������������getPathURI()checkSpecs()copyRemoteFiles()copyAndConfigureFiles()copyJar(PathoriginalJarPath, PathsubmitJarFile,shortreplication)addMRFrameworkToDistributedCache()submitJobInternal(Jobjob, Clustercluster) //������������������������writeNewSplits(JobContextjob, PathjobSubmitDir) getJobSubmitter(FileSystem fs, ClientProtocol submitClient)//���������������������JobSubmitter��������������� }

 

���������������submitJobInternal������

JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {  //validate the jobs output specs ���������������������������   checkSpecs(job);  Configuration conf = job.getConfiguration(); //������������������  addMRFrameworkToDistributedCache(conf); //���������������  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// ������������������  //configure the command line options correctly on the submitting dfs  InetAddress ip = InetAddress.getLocalHost(); //���������������������������������ip������  if (ip != null) {    submitHostAddress = ip.getHostAddress();//���������IP������������������������     submitHostName = ip.getHostName();//���������������     conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); //������������conf���    conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);  }  JobID jobId = submitClient.getNewJobID(); //������JOBId���������ID���������  job.setJobID(jobId); //������job���id  Path submitJobDir = new Path(jobStagingArea, jobId.toString());//������������������������������������������������ID������   JobStatus status = null;  try {    conf.set(MRJobConfig.USER_NAME,        UserGroupInformation.getCurrentUser().getShortUserName()); //���������������    conf.set("hadoop.http.filter.initializers",         "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//������������Http���������������������������     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());//������������job���������    LOG.debug("Configuring job " + jobId + " with " + submitJobDir         + " as the submit dir");    // get delegation token for the dir  /* ���������������������������������������(token) */     TokenCache.obtainTokensForNamenodes(job.getCredentials(),        new Path[] { submitJobDir }, conf); //���������NameNode���������������������         populateTokenCache(conf, job.getCredentials());    // generate a secret to authenticate shuffle transfers//������������Mapper���Reducer������������������������������������     if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {      KeyGenerator keyGen;      try {        keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);        keyGen.init(SHUFFLE_KEY_LENGTH);      } catch (NoSuchAlgorithmException e) {        throw new IOException("Error generating shuffle secret key", e);      }      SecretKey shuffleKey = keyGen.generateKey();      TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),          job.getCredentials());    }    if (CryptoUtils.isEncryptedSpillEnabled(conf)) {      conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);      LOG.warn("Max job attempts set to 1 since encrypted intermediate" +              "data spill is enabled");    }    copyAndConfigureFiles(job, submitJobDir);//���������������������������������HDFS������������������������10���������������������������������    Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);//������������������         // Create the splits for the job    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));    int maps = writeSplits(job, submitJobDir);    //������map������������������������map���������������������������������������    conf.setInt(MRJobConfig.NUM_MAPS, maps);    LOG.info("number of splits:" + maps);    // write "queue admins of the queue to which job is being submitted"  to job file.    String queue = conf.get(MRJobConfig.QUEUE_NAME,        JobConf.DEFAULT_QUEUE_NAME); //���������������������������������default���    AccessControlList acl = submitClient.getQueueAdmins(queue);    conf.set(toFullPropertyName(queue,        QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());  //������acl������     // removing jobtoken referrals before copying the jobconf to HDFS    // as the tasks don't need this setting, actually they may break    // because of it if present as the referral will point to a    // different job.    TokenCache.cleanUpTokenReferral(conf); //������Token���������������    if (conf.getBoolean(        MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,        MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {      // Add HDFS tracking ids ���������������������������������      ArrayList
trackingIds = new ArrayList
(); for (Token
t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); //������������������������������ } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); //������������������ } // Set reservation info if it exists��������������������������������� ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir writeConf(conf, submitJobFile);//���conf���������������������.xml������ // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials());//������������,������YarnRunner.submitJob()���LocalJobRunner.submitJob() status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; //������������ } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); // ������������������ } }}

���submitJobInternal���������������������������������������������������������������������������������:

������������������������������������������RM������,���RM������������������������������;

������������������������RM������������,������������������������������������������������������������������������������������������IP���������������������������������������ID���,������������MapReduce���������������������������������,������������������������������������������(Token)������������������������������������������RM,������������������������������,������������������������������������������������������jar���������������������������������������������������������������������������,���������������������������������������������������������������������RM,������RM������������������������������������,������������������������������������������������������������HDFS���������������,���������������������������������������������������

������������������������������������������,���������HDFS������������������������������������������������HDFS������������������������������������������������������������,���������������������(stagingdirectory)���������������������������JobSubmissionFiles.getStagingDir()���������������������������������������������������������������ID,���JobId������������������������������������������������������������������,���������������������submitJobDir���������������������������������������������������,������������������������������������

���������������������������������map������������������������������������connect()������������������������YARNRunner������������LocalJobRunner������submitJob������������������������������������������RM���������������������������

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]

������������������

 

 

 

 

 
上一篇:linux文件系统详解
下一篇:linux之压缩和解压

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2025年04月21日 15时00分59秒