
【MapReduce】基础案例 ---- 排序 ( 按手机好分区,每个区中按照总流量排序 )
发布日期:2021-05-07 02:49:17
浏览次数:26
分类:精选文章
本文共 8050 字,大约阅读时间需要 26 分钟。
文章目录
☠ WritableComparable排序案例(分区、排序)
▪ 案例
对排序好的文件进行分区、排序,按照手机号将数据分到不同的区中,并进行排序输出文件。
需求分析
- 基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
代码实现
♦ 思路一:封装Bean类自定义排序,自定义分区类
PhonePartitioner分区类
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class PhonePartitioner extends Partitioner{ @Override public int getPartition(FBP fbp, Text text, int numPartitions) { // key 是手机号 // value 是户主信息 // 1.获取手机号前三位 String phoneNum = text.toString().substring(0,3); // 2.定义分区数 注意:分区数必须从0开始 int partition = 4; if ("136".equals(phoneNum)){ partition = 0; } else if ("137".equals(phoneNum)){ partition = 1; } else if ("138".equals(phoneNum)){ partition = 2; }else if ("139".equals(phoneNum)){ partition = 3; } else { partition = 4; } return partition; }}
Bean类
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FBP implements WritableComparable{ private long upFlow; private long downFlow; private long sumFlow; public FBP() { } @Override public int compareTo(FBP bean) { int result = 0; if (sumFlow>bean.getSumFlow()){ result = -1; }else if (upFlow
Mapper阶段
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FBP_M extends Mapper{ FBP k = new FBP(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 13509468723 7335 110349 117684 // 13975057813 11058 48243 59301 // 1.读取一行数据 String line = value.toString(); // 2.分割 String[] fields = line.split("\t"); // 3.封装对象 String phone = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); long sumFlow = Long.parseLong(fields[3]); k.setUpFlow(upFlow); k.setDownFlow(downFlow); k.setSumFlow(sumFlow); v.set(phone); // 4.写入 context.write(k,v); }}
Reducer阶段
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FBP_R extends Reducer{ @Override protected void reduce(FBP key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value:values){ // 写出 context.write(value,key); } }}
Driver阶段
package 第三章_MR框架原理.排序;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FBP_D { public static void main(String[] args) { Job job = null; Configuration conf = new Configuration(); try { // 获取job对象 job = Job.getInstance(conf); // 配置 job.setMapperClass(FBP_M.class); job.setReducerClass(FBP_R.class); job.setJarByClass(FBP_D.class); job.setMapOutputKeyClass(FBP.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FBP.class); // 设置分区信息 job.setPartitionerClass(PhonePartitioner.class); // 同时指定相应数量的reduce task job.setNumReduceTasks(5); // 设置输入输出路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\全排output")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\分区排output")); // 提交job boolean result = job.waitForCompletion(true); System.exit(result?0:1); } catch (Exception e ){ e.printStackTrace(); } }}
♦ 思路二:不封装Bean类,按照默认对key排序,自定义分区类
Partition阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class Partition extends Partitioner{ @Override public int getPartition(IntWritable intWritable, Text text, int numPartitions) { String phoneNum = text.toString().substring(0,3); int partition = 4; if ("136".equals(phoneNum)){ partition = 0; }else if ("137".equals(phoneNum)){ partition = 1; }else if ("138".equals(phoneNum)){ partition = 2; }else if ("139".equals(phoneNum)){ partition = 3; }else { partition = 4; } return partition; }}
Mapper阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PSMapper extends Mapper{ Text v = new Text(); IntWritable k = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 读取一行数据 String line = value.toString(); // 2.拆分 String[] fields = line.split("\t"); int sunFlow = Integer.parseInt(fields[3]); String info = fields[0]+"\t"+fields[1]+"\t"+fields[2]; k.set(-sunFlow); v.set(info); // 3.写出 context.write(k,v); }}
Reducer阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PSReducer extends Reducer{ IntWritable v = new IntWritable(); @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value:values){ v.set(-key.get()); context.write(value,v); } }}
Driver阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PSDriver { public static void main(String[] args) { Job job; Configuration conf = new Configuration(); try{ // 1. 获取job job = Job.getInstance(conf); // 2. 配置 job.setMapperClass(PSMapper.class); job.setReducerClass(PSReducer.class); job.setJarByClass(PSDriver.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.设置分区关联 job.setPartitionerClass(Partition.class); // 设置ReduceTask个数 job.setNumReduceTasks(5); // .输入输出数据路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\全排output\\part-r-00000")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\分区排序\\psoutput")); // 4.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); } catch (Exception e){ e.printStackTrace(); } }}
发表评论
最新留言
很好
[***.229.124.182]2025年04月14日 02时27分18秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
多媒体文件格式全解说(下)--图片
2021-05-09
淘宝WAP版小BUG分析
2021-05-09
NodeJS+Express+MongoDB
2021-05-09
(四十四)c#Winform自定义控件-水波-HZHControls
2021-05-09
c#winform主题实现的一个方法
2021-05-09
asp.net打印网页后自动关闭网页【无需插件】
2021-05-09
一个人开发的html整站源码分享网站就这么上线了
2021-05-09
SQLServer 查看耗时较多的SQL语句(转)
2021-05-09
【计算机网络】应用层
2021-05-09
【Maven】POM基本概念
2021-05-09
【Java思考】Java 中的实参与形参之间的传递到底是值传递还是引用传递呢?
2021-05-09
【设计模式】单例模式
2021-05-09
【SpringCloud】Hystrix熔断器
2021-05-09
【Linux】2.3 Linux目录结构
2021-05-09
java.util.Optional学习笔记
2021-05-09
远程触发Jenkins的Pipeline任务的并发问题处理
2021-05-09
jackson学习之七:常用Field注解
2021-05-09
jackson学习之八:常用方法注解
2021-05-09
Web应用程序并发问题处理的一点小经验
2021-05-09
asp.net core的授权过滤器中获取action上的Attribute
2021-05-09