
本文共 27517 字,大约阅读时间需要 91 分钟。
������wordcount���������������
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * @author: LUGH1 * @date: 2019-4-8 * @description: */public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.88.130:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WdMapper.class); job.setReducerClass(WdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/test/word.txt")); FileOutputFormat.setOutputPath(job, new Path("/test/output")); boolean result = job.waitForCompletion(true); System.exit(result?0:1); System.out.println("good job"); }}class WdMapper extends Mapper
���������������������wordcount���������������������������������������������������������main���������������������job������������������������������������������������������waitForCompletion������
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //....������������������..... boolean result = job.waitForCompletion(true); //���������Job������������������waitForCompletion()������������ System.exit(result?0:1); }
������������������������������������waitForCompletion������������������Job������������������������������������������������������������������������
public class Job extends JobContextImpl implements JobContext { private static final Log LOG = LogFactory.getLog(Job.class); public static enum JobState {DEFINE, RUNNING}; //������������������ private static final long MAX_JOBSTATUS_AGE = 1000 * 2; //������������2000������������������ public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval"; static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval"; static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used"; public static final String SUBMIT_REPLICATION = "mapreduce.client.submit.file.replication"; public static final int DEFAULT_SUBMIT_REPLICATION = 10; public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } static { ConfigUtil.loadResources(); //������������ } private JobState state = JobState.DEFINE; //���������������������������������������DEFINE������ private JobStatus status; private long statustime; private Cluster cluster; private ReservationId reservationId; boolean waitForCompletion(booleanverbose) submit() setUseNewAPI() connect() getJobSubmitter(FileSystemfs, ClientProtocolsubmitClient) isUber() //������������������������(MapTask���ReduceTask������������������) setPartitionerClass()//Mapper���������������������Partitioner������������������������������Reducer setMapSpeculativeExecution() //���������������Speculative���Mapper��������������������� setReduceSpeculativeExecution() //���������������Speculative���Reducer��������������������� setCacheFiles() }
���������Job���������������������������������������������������������������java���������������������������������������������������������������������������main���������������Job job = Job.getInstance(conf);������������������������������������������������������������������������������������������������������������������������������������������������job������������������DEFINE������������������������������������������������������������������������������
public static void loadResources() { addDeprecatedKeys(); Configuration.addDefaultResource("mapred-default.xml"); Configuration.addDefaultResource("mapred-site.xml"); Configuration.addDefaultResource("yarn-default.xml"); Configuration.addDefaultResource("yarn-site.xml"); }
���������������������������������hadoop���������������������������������������������waitForCompletion������������������������������������������������������������������waitForCompletion���������
//org.apache.hadoop.mapreduce������Job���public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {if (state == JobState.DEFINE) { //���������������������DEFINE��������������������������������� submit(); //������������ } if (verbose) { //������������������������������������������������������monitorAndPrintJob(); //��������������������������������� } else { //��������������������������������������������� // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful();}
������
���������������������������������������,���������������������������������������,���������������Job.submit()���������,��������������������������������������������������������� DEFINE ������,��������������������������������������������� ������������,JobState������������ DEFINE ��� RUNNING ������,������Job���������������������������������Job()������������������ DEFINE,��������������������������������������� RUNNING,������������������������
���������������������������,Job.submit() ������������������,������������������������������������������������������,������������������������������������������ ������,���Job.submit()������������,Job.waitForCompletion()������������������������������������������������������ ���������������,������������verbose���true,������������������������������������������,������������������������������������������������������������������������������������������������������
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]
������������������������������������submit���������
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //������������������������DEFINE setUseNewAPI(); //������������������������������������API������ connect(); //���������������������������Cluster���cluster������ final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//������JobSubmitter���������������submitter status = ugi.doAs(new PrivilegedExceptionAction() { //ugi.doAs������������������ public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); //������������������������ } }); state = JobState.RUNNING; //������job������������RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }
���������������������connect���������
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { //������cluter������������������������������cluster������ cluster = ugi.doAs(new PrivilegedExceptionAction() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); //������cluster } }); } }
������connect()������������������������������������Cluster���������,���������������,������������������ ���������������������Cluster���������������������������������
public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; //������������������ private ClientProtocolProvider clientProtocolProvider; //������������YarnClientProtocolProvider ������������������LocalClientProtocolProvider private ClientProtocol client; //������������������,��������������������������������������� private UserGroupInformation ugi; //������������������ private Configuration conf; //������������ private FileSystem fs = null; //������������ private Path sysDir = null; //������������ private Path stagingAreaDir = null; private Path jobHistoryDir = null; //������������������ private static final Log LOG = LogFactory.getLog(Cluster.class);//ServiceLoader,������������//ClientProtocolProvider������ServiceLoader,���������������������ServiceLoaderl.oad()���������ServiceLoader���������Iterable������, //������������iterator()������,������������������for������������//���������������������load()������,������������ClassLoader������Class private static ServiceLoader frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); //������������������ }//���������public Cluster(Configuration conf) throws IOException { this(null, conf);}//���������public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); //������initialize������}//������������������ClientProtocolProvider���ClientProtocolprivate void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //���������������������������������������������,������������ for (ClientProtocolProvider provider : frameworkLoader) { //������frameworkLoader������provider LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { //������ClientProtocolProvider���create������������clientProtocol clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; //���������������ClientProtocol������,YARNRunner���LocalJobRunner LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; //��������������������� } else { //��������������������� LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } if (null == clientProtocolProvider || null == client) { //���������������������ClientProtocolProvider���ClientProtocol������ throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); }}
������������������job������connect���������������������������cluster������������������������Cluster���������������������������������������������������������������������������ConfigUtil.loadResources()���������������������frameworkLoader���������������������������Cluster���������������������Cluster������������������������������Cluster.initialize()���������������ClientProtocolProvider���ClientProtocol������������RM������������������,������RM������������������,������RM������������������������������,������������������������������������������������,������������������������,������������������������,������������������������,������������������������Hadoop������������,���������Protocol������������������������������������������������������,���������������YARN���������������������������������������������ClientProtocol������������������������,���ClientProtocolProvider���������������ClientProtocol������������,������������������Factory������������
������ServiceLoader<ClientProtocolProvider>,������������������ClientProtocolProvider������
������������������������������ClientProtocolProvider������������������������������,���������������������������������������������������������������������������������������������
public abstract class ClientProtocolProvider { public abstract ClientProtocol create(Configuration conf) throws IOException; public abstract ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException; public abstract void close(ClientProtocol clientProtocol) throws IOException;}
���������������������������������������������������YarnClientProtocolProvider���LocalClientProtocolProvider
package org.apache.hadoop.mapred;public class YarnClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); //YARNRunner���������ClientProtocol������ } return null; } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { return create(conf); } @Override public void close(ClientProtocol clientProtocol) throws IOException { if (clientProtocol instanceof YARNRunner) { ((YARNRunner)clientProtocol).close(); } }
package org.apache.hadoop.mapred;public class LocalClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); //map������1 return new LocalJobRunner(conf); //LocalJobRunner���������ClientProtocol������ } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) { return null; // LocalJobRunner doesn't use a socket } @Override public void close(ClientProtocol clientProtocol) { // no clean up required }
������������������������Cluster.initialize()���������
������������ServiceLoader���������Iterable������,������������iterator()������,������������������for���������������������������������load()������,������������ClassLoader������Class���������,������������������������������������������������������ServiceLoader���������frameworkLoader,���LinkedHashMap���������������������������������,������������������������iterator()������������������������������������
������������,���Cluster������������������������������������initialize(),������������������ClientProtocolProvider���ClientProtocol���
������������ClientProtocolProvider���������������,���������������������������������������������������������������������������������������������Hadoop������������������������������������������������������������������,���������LocalClientProtocolProvider���YarnClientProtocolProvide
������������������,������������ClientProtocolProvider���������ClientProtocol������������������������������ClientProtocol������������,���������������������������������������,���������LocalJobRunner���YARNRunner������������������������������������������������
������initialize���for������,���������������ServiceLoader���iterator()���������������������������������������ClientProtocolProvider���������,������������������ClientProtocolProvider.create()������������������������ClientProtocol,���������������LocalJobRunner���YARNRunner������������������������������,������������������������������,���������������������������;������,���������������������,������������������������,���������������������������RM������������������������������������������,���������������������������������������������ClientProtocolProvider������������,���������������������������������LocalClientProtocolProvider���YarnClientProtocolProvider���������������������������������������������������,������������������������������
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]
���������������������������������������������
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]
���������������������yarn������������������������������������������
������YarnClientProtocolProvider.create()���������������������������������new YARNRunner(conf)���������
������������������������������������Job.submit()������������������connect���������������������������������������������getJobSubmitter()������������ ������������������������JobSubmitter���������,������Jobs. ubmit()���������������submitJobInternal()������,������������������������������JobSubmitter������������������������������������getJobSubmitter()������������������,������cluster.getFileSystem()���cluster.getClient()��� ������cluster.getClient()��������������� YARNRunner���LocalJobRunner;���cluster.getFileSystem()��������������������� YARNRunner��� RM ������������������������ URL,������ LocalJobRunner���������������������������������������������mapred/system���������������
������������������������JobSubmitter������������������������������
package org.apache.hadoop.mapreduce;class JobSubmitter { protected static final Log LOG = LogFactory.getLog(JobSubmitter.class); private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle������ private static final int SHUFFLE_KEY_LENGTH = 64; private FileSystem jtFs; private ClientProtocol submitClient; private String submitHostName; private String submitHostAddress; JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) throws IOException { this.submitClient = submitClient; //���������������������YARNRunner this.jtFs = submitFs; }compareFs(FileSystemsrcFs, FileSystemdestFs) //������������������������������������getPathURI()checkSpecs()copyRemoteFiles()copyAndConfigureFiles()copyJar(PathoriginalJarPath, PathsubmitJarFile,shortreplication)addMRFrameworkToDistributedCache()submitJobInternal(Jobjob, Clustercluster) //������������������������writeNewSplits(JobContextjob, PathjobSubmitDir) getJobSubmitter(FileSystem fs, ClientProtocol submitClient)//���������������������JobSubmitter��������������� }
���������������submitJobInternal������
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs ��������������������������� checkSpecs(job); Configuration conf = job.getConfiguration(); //������������������ addMRFrameworkToDistributedCache(conf); //��������������� Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// ������������������ //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); //���������������������������������ip������ if (ip != null) { submitHostAddress = ip.getHostAddress();//���������IP������������������������ submitHostName = ip.getHostName();//��������������� conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); //������������conf��� conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); //������JOBId���������ID��������� job.setJobID(jobId); //������job���id Path submitJobDir = new Path(jobStagingArea, jobId.toString());//������������������������������������������������ID������ JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); //��������������� conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//������������Http��������������������������� conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());//������������job��������� LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir /* ���������������������������������������(token) */ TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); //���������NameNode��������������������� populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers//������������Mapper���Reducer������������������������������������ if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } if (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } copyAndConfigureFiles(job, submitJobDir);//���������������������������������HDFS������������������������10��������������������������������� Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);//������������������ // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); //������map������������������������map��������������������������������������� conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); //���������������������������������default��� AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); //������acl������ // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); //������Token��������������� if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ��������������������������������� ArrayListtrackingIds = new ArrayList (); for (Token t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); //������������������������������ } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); //������������������ } // Set reservation info if it exists��������������������������������� ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir writeConf(conf, submitJobFile);//���conf���������������������.xml������ // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials());//������������,������YarnRunner.submitJob()���LocalJobRunner.submitJob() status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; //������������ } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); // ������������������ } }}
���submitJobInternal���������������������������������������������������������������������������������:
������������������������������������������RM������,���RM������������������������������;
������������������������RM������������,������������������������������������������������������������������������������������������IP���������������������������������������ID���,������������MapReduce���������������������������������,������������������������������������������(Token)������������������������������������������RM,������������������������������,������������������������������������������������������jar���������������������������������������������������������������������������,���������������������������������������������������������������������RM,������RM������������������������������������,������������������������������������������������������������HDFS���������������,���������������������������������������������������
������������������������������������������,���������HDFS������������������������������������������������HDFS������������������������������������������������������������,���������������������(stagingdirectory)���������������������������JobSubmissionFiles.getStagingDir()���������������������������������������������������������������ID,���JobId������������������������������������������������������������������,���������������������submitJobDir���������������������������������������������������,������������������������������������
���������������������������������map������������������������������������connect()������������������������YARNRunner������������LocalJobRunner������submitJob������������������������������������������RM���������������������������
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]
������������������
发表评论
最新留言
关于作者
