MapReduce Java API-多输入路径方式
发布日期:2025-04-11 23:23:20 浏览次数:8 分类:精选文章

本文共 3265 字,大约阅读时间需要 10 分钟。

MapReduce Java API实例-统计单词出现频率

在已实现单次单词频率统计的基础上,本文将详细说明如何处理多个数据集文件(如多个txt文件)的输入问题。

多文件输入的实现方法如下:

多文件输入可以通过Hadoop提供的MultipleInputs类的addInputPath方法来实现。这种方法允许将多个文件路径添加到MapReduce任务中,从而支持批量处理。

以下是完整的代码实现:

  • Map类
  • package com.badao.multinput;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.StringTokenizer;public class MultInputMapper extends Mapper
    { private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()) { word.set(stringTokenizer.nextToken()); context.write(word, one); } }}
    1. Reduce类
    2. package com.badao.multinput;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MultInputReducer extends Reducer
      { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable
      values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }}
      1. Job类
      2. package com.badao.multinput;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MultInputJob {    public static void main(String[] args)            throws InterruptedException, IOException, ClassNotFoundException {        wordCountLocal();    }    public static void wordCountLocal()            throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "multinputwordcount");        job.setJarByClass(MultInputJob.class);        job.setMapperClass(MultInputMapper.class);        job.setCombinerClass(IntSumReducer.class);        job.setReducerClass(MultInputReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        Path path1 = new Path("D:\\words.txt");        Path path2 = new Path("D:\\words2.txt");        MultipleInputs.addInputPath(job, path1, TextInputFormat.class, MultInputMapper.class);        MultipleInputs.addInputPath(job, path2, TextInputFormat.class, MultInputMapper.class);        FileOutputFormat.setOutputPath(job, new Path("D:\\mulinputOut"));        job.waitForCompletion(true);    }}

        运行Job类查看效果:

        以上实现展示了如何在MapReduce框架中处理多个输入文件。通过MultipleInputs.addInputPath方法,可以轻松地将多个输入路径添加到同一个MapReduce任务中,从而实现多文件批量处理。

    上一篇:MapReduce Java API实例-排序
    下一篇:MapReduce Java API-使用Partitioner实现输出到多个文件

    发表评论

    最新留言

    做的很好,不错不错
    [***.243.131.199]2025年05月03日 22时36分58秒