
本文共 8520 字,大约阅读时间需要 28 分钟。
������������������������������������������������������������
JobSubmitter.submitJobInternal������������������
int maps = writeSplits(job, submitJobDir); //������map���������������map������������������������������������������������������������������������������������������������������������JobSubmitter.writeSplits���������
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); //������������������������������ } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps;}
������������������JobSubmitter.writeNewSplits������:
privateint writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //���������������InputFormat��������������� List splits = input.getSplits(job); //������InputFormat������������getSplits������ T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); //������������������������������������������������������ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);//������Split������ return array.length;}
������������������InputFormat������������������
public abstract class InputFormat{ //������������������������ public abstract List getSplits(JobContext context ) throws IOException, InterruptedException; //RecordReader������������������������������������������������������K-V������������������������������������������������InputSplit������������������ //������������������������nextKeyvalue()������������������������������������������K-V ������ public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;}
������������������������������������������������������
public class TextInputFormat extends FileInputFormat���
public abstract class FileInputFormat<K, V> extends InputFormat���public abstract class InputFormat���������TextInputFormat������������FileInputFormat������������������������������������������������FileInputFormat������TextInputFormat���������������������������������������
public class TextInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //LineRecordReader���������FileSplit���������������start���������FileSplit������������������pos��������������������������������� //end������������������������in���������������������������������������������������������������������FileSplit��������������������������������� //key���value���������������������������K-V���������������������������������������getProgress()��������������������������������� //���������������������������������������K-V���������K-V������������������������������ return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { //��������������������������������������������������������������� final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }}
������������������JobSubmitter.writeNewSplits���������������List<InputSplit> splits = input.getSplits(job);������������������TextInputFormat.getSplits()������������TextInputFormat���������FileInputFormat���������������������������FileInputFormat.getSplits()���������
public ListgetSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start();//��������������������������������� long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //������������������1 long maxSize = getMaxSplitSize(job); //������������long������������������������0x7fffffffffffffffL // generate splits List splits = new ArrayList (); List files = listStatus(job); //��������������������������� for (FileStatus file: files) { Path path = file.getPath(); //������������ long length = file.getLen(); //������������ if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) {//������������������������������������������������ blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //������������ FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { //������������������������ long blockSize = file.getBlockSize(); //128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); //���������������������������������128M long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //������������������������������������128M*1.1 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//f������������������������������ splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; // ��������������������������������������������������������� }// ������������������������������������������������0������������������������������ if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); }//������������������������������������������������ } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //������������������ //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //������������NUM_INPUT_FILES sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits;}//public class FileSplit extends InputSplit implements Writable {// private Path file;//������������������ // private long start;//���������������������������(������)// private long length;//������������// private String[] hosts;//������������������������������������������������������// private SplitLocationInfo[] hostInfos;//���������������������������������,������������������ //}//makeSplit���������������������������protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { return new FileSplit(file, start, length, hosts, inMemoryHosts);}//���������������������protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize));}
������FileInputFormat.getSplits()������������������������������������ArraryList���������������������JobSubmitter.writeNewSplits������������
������������ArrayList���������������������������������������������������������������JobSplitWriter.createSplitFiles()������������split������������������������������������������������map������������
发表评论
最新留言
关于作者
