MapReduce Java API实例-统计单词出现频率
发布日期:2025-04-11 23:31:41 浏览次数:9 分类:精选文章

本文共 4896 字,大约阅读时间需要 16 分钟。

Windows下使用Java API操作HDFS的常用方法

在使用Java API操作HDFS之前,需要先配置好开发环境。接下来,我们将详细介绍如何使用Java API实现Mapreduce统计单次出现的次数。Hadoop集群搭建的是Hadoop2.8.0,因此我们需要新建一个Maven项目并引入相应的依赖包。

引入依赖包

为了能够使用Java API操作HDFS,我们需要引入以下依赖包:

org.apache.hadoop
hadoop-client
2.8.0
org.apache.hadoop
hadoop-common
2.8.0
org.apache.hadoop
hadoop-hdfs
2.8.0
org.apache.hadoop
hadoop-hdfs-client
2.8.0
org.apache.hadoop
hadoop-mapreduce-client-core
2.8.0
junit
junit
4.12
test

实现Mapreduce统计单词出现次数

接下来,我们将编写一个简单的Mapreduce程序,统计单词出现的次数。程序将包括以下几个步骤:

  • 新建数据集words.txt
  • 编写Map类
  • 编写Reduce类
  • 编写Job类
  • 1. 新建数据集words.txt

    首先,我们需要创建一个名为words.txt的数据集。例如:

    hello world hadoop bigdata java hadoop

    2. 编写Map类

    Map类继承自Mapper类,用于处理输入的数据。以下是WorldCountMapper类的实现:

    package com.badao.mapreducedemo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.StringTokenizer;public class WorldCountMapper extends Mapper
    { private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()) { word.set(stringTokenizer.nextToken()); context.write(word, one); } }}

    3. 编写Reduce类

    Reduce类继承自Reducer类,用于处理Map阶段输出的数据。以下是WordCountReducer类的实现:

    package com.badao.mapreducedemo;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer
    { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable
    values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); }}

    4. 编写Job类

    Job类负责提交Mapreduce任务。以下是WorldCountJob类的实现:

    package com.badao.mapreducedemo;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;import java.io.IOException;public class WorldCountJob {    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {        wordCountLocal();    }    public static void wordCountLocal() throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf, "wordcount");        job.setJarByClass(WorldCountJob.class);        job.setMapperClass(WorldCountMapper.class);        job.setCombinerClass(IntSumReducer.class);        job.setReducerClass(WordCountReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.addInputPath(job, new Path("D:\\words.txt"));        FileOutputFormat.setOutputPath(job, new Path("D:\\badao"));        job.waitForCompletion(true);    }    public static void wordCountColony() throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://192.168.148.128:9000");        System.setProperty("HADOOP_USER_NAME", "root");        Job job = Job.getInstance(conf, "wordcount");        job.setJarByClass(WorldCountJob.class);        job.setMapperClass(WorldCountMapper.class);        job.setCombinerClass(IntSumReducer.class);        job.setReducerClass(WordCountReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        FileInputFormat.addInputPath(job, new Path("/words.txt"));        FileOutputFormat.setOutputPath(job, new Path("/badao9"));        job.waitForCompletion(true);    }}

    使用说明

  • 本地文件处理

    • 修改wordCountLocal()方法中的输入路径和输出路径,确保输入文件words.txt存在于指定路径。-运行程序时,调用WorldCountJob.wordCountLocal()方法。
  • 集群HDFS处理

    • 修改wordCountColony()方法中的输入路径和输出路径,确保输入文件已上传到HDFS集群,并且有权限访问。-运行程序时,调用WorldCountJob.wordCountColony()方法。
  • 注意事项

    • 依赖包:确保所有依赖包已正确引入,路径正确无误。
    • 路径配置:输入路径必须存在,输出路径必须不存在。
    • 权限:在集群环境中,确保用户有权限访问输入路径和输出路径。

    通过以上步骤,我们可以在Windows环境下使用Java API对HDFS中的文本文件进行单词频率统计。

    上一篇:MapReduce Java API实例-统计平均成绩
    下一篇:MapReduce Java API实例-统计出现过的单词

    发表评论

    最新留言

    第一次来,支持一个
    [***.219.124.196]2025年05月19日 22时00分06秒