
Hadoop_MapReduce取分组后的最大值
发布日期:2021-05-07 00:19:17
浏览次数:20
分类:精选文章
本文共 2253 字,大约阅读时间需要 7 分钟。
Hadoop_MapReduce取分组后的最大值
package com.lius.hadoop.mapReduce;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * MapReduce取分组后的最大值 * @author Administrator * */public class parationerKeyAndGetMaxValue { static class keyMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String[] values = value.toString().split("\t"); context.write(new Text(values[3]), new Text(values[0])); } } static class keyReduce extends Reducer { @Override protected void reduce(Text key, Iterable value, Reducer .Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long result = 0; Iterator valueIters = value.iterator(); while(valueIters.hasNext()) { long val = Long.parseLong(valueIters.next().toString()); result = result>val?result:val; } context.write(key, new Text(String.valueOf(result))); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.jar", "E://jars/keyGroupGetMaxValue.jar"); Job job = Job.getInstance(conf); job.setJarByClass(parationerKeyAndGetMaxValue.class); job.setMapperClass(keyMapper.class); job.setReducerClass(keyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inPath = new Path("/usr/lius/parationData"); Path outPath = new Path("/usr/lius/keyGroupGetMaxValue"); FileSystem fs = outPath.getFileSystem(conf); if(fs.exists(outPath)){ fs.delete(outPath, true); } FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } }