Hadoop_MapReduce取分组后的最大值
发布日期:2021-05-07 00:19:17 浏览次数:20 分类:精选文章

本文共 2253 字,大约阅读时间需要 7 分钟。

Hadoop_MapReduce取分组后的最大值 

package com.lius.hadoop.mapReduce;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * MapReduce取分组后的最大值 * @author Administrator * */public class parationerKeyAndGetMaxValue {	static class keyMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String[] values = value.toString().split("\t"); context.write(new Text(values[3]), new Text(values[0])); } } static class keyReduce extends Reducer
{ @Override protected void reduce(Text key, Iterable
value, Reducer
.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long result = 0; Iterator
valueIters = value.iterator(); while(valueIters.hasNext()) { long val = Long.parseLong(valueIters.next().toString()); result = result>val?result:val; } context.write(key, new Text(String.valueOf(result))); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.jar", "E://jars/keyGroupGetMaxValue.jar"); Job job = Job.getInstance(conf); job.setJarByClass(parationerKeyAndGetMaxValue.class); job.setMapperClass(keyMapper.class); job.setReducerClass(keyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path inPath = new Path("/usr/lius/parationData"); Path outPath = new Path("/usr/lius/keyGroupGetMaxValue"); FileSystem fs = outPath.getFileSystem(conf); if(fs.exists(outPath)){ fs.delete(outPath, true); } FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } }

 

上一篇:Hadoop_Java操作Hbase
下一篇:Hadoop_MapReduce-Key的分组和排序[]

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年04月18日 10时37分43秒