hadoop2.7作业提交详解之文件分片
发布日期:2021-05-09 09:32:52 浏览次数:18 分类:博客文章

本文共 8520 字,大约阅读时间需要 28 分钟。

������������������������������������������������������������

JobSubmitter.submitJobInternal������������������

int maps = writeSplits(job, submitJobDir); //������map���������������map������������������������������������������������������������������������������������

������������������������JobSubmitter.writeSplits���������

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,    Path jobSubmitDir) throws IOException,    InterruptedException, ClassNotFoundException {  JobConf jConf = (JobConf)job.getConfiguration();  int maps;  if (jConf.getUseNewMapper()) {    maps = writeNewSplits(job, jobSubmitDir); //������������������������������  } else {    maps = writeOldSplits(jConf, jobSubmitDir);  }  return maps;}

������������������JobSubmitter.writeNewSplits������:

private 
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat
input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //���������������InputFormat��������������� List
splits = input.getSplits(job); //������InputFormat������������getSplits������ T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); //������������������������������������������������������ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);//������Split������ return array.length;}

������������������InputFormat������������������

public abstract class InputFormat
{ //������������������������ public abstract List
getSplits(JobContext context ) throws IOException, InterruptedException; //RecordReader������������������������������������������������������K-V������������������������������������������������InputSplit������������������ //������������������������nextKeyvalue()������������������������������������������K-V ������ public abstract RecordReader
createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;}

������������������������������������������������������

public class TextInputFormat extends FileInputFormat���

public abstract class FileInputFormat<K, V> extends InputFormat���
public abstract class InputFormat���

������TextInputFormat������������FileInputFormat������������������������������������������������FileInputFormat������TextInputFormat���������������������������������������

public class TextInputFormat extends FileInputFormat
{ @Override public RecordReader
createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //LineRecordReader���������FileSplit���������������start���������FileSplit������������������pos��������������������������������� //end������������������������in���������������������������������������������������������������������FileSplit��������������������������������� //key���value���������������������������K-V���������������������������������������getProgress()��������������������������������� //���������������������������������������K-V���������K-V������������������������������ return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { //��������������������������������������������������������������� final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }}

������������������JobSubmitter.writeNewSplits���������������List<InputSplit> splits = input.getSplits(job);������������������TextInputFormat.getSplits()������������TextInputFormat���������FileInputFormat���������������������������FileInputFormat.getSplits()���������

public List
getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start();//��������������������������������� long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //������������������1 long maxSize = getMaxSplitSize(job); //������������long������������������������0x7fffffffffffffffL // generate splits List
splits = new ArrayList
(); List
files = listStatus(job); //��������������������������� for (FileStatus file: files) { Path path = file.getPath(); //������������ long length = file.getLen(); //������������ if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) {//������������������������������������������������ blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //������������ FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { //������������������������ long blockSize = file.getBlockSize(); //128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); //���������������������������������128M long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //������������������������������������128M*1.1 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//f������������������������������ splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; // ��������������������������������������������������������� }// ������������������������������������������������0������������������������������ if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); }//������������������������������������������������ } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //������������������ //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //������������NUM_INPUT_FILES sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits;}//public class FileSplit extends InputSplit implements Writable {// private Path file;//������������������ // private long start;//���������������������������(������)// private long length;//������������// private String[] hosts;//������������������������������������������������������// private SplitLocationInfo[] hostInfos;//���������������������������������,������������������ //}//makeSplit���������������������������protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { return new FileSplit(file, start, length, hosts, inMemoryHosts);}//���������������������protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize));}

������FileInputFormat.getSplits()������������������������������������ArraryList���������������������JobSubmitter.writeNewSplits������������

������������ArrayList���������������������������������������������������������������JobSplitWriter.createSplitFiles()������������split������������������������������������������������map������������

上一篇:java字符串详解
下一篇:hadoop2.7之作业提交详解(下)

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年04月20日 04时19分06秒