【MapReduce】基础案例 ---- 排序 ( 按手机好分区,每个区中按照总流量排序 )
发布日期:2021-05-07 02:49:17 浏览次数:26 分类:精选文章

本文共 8050 字,大约阅读时间需要 26 分钟。


文章目录


☠ WritableComparable排序案例(分区、排序)

▪ 案例

对排序好的文件进行分区、排序,按照手机号将数据分到不同的区中,并进行排序输出文件。


需求分析

  • 基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
    在这里插入图片描述

代码实现

♦ 思路一:封装Bean类自定义排序,自定义分区类

PhonePartitioner分区类
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class PhonePartitioner extends Partitioner
{ @Override public int getPartition(FBP fbp, Text text, int numPartitions) { // key 是手机号 // value 是户主信息 // 1.获取手机号前三位 String phoneNum = text.toString().substring(0,3); // 2.定义分区数 注意:分区数必须从0开始 int partition = 4; if ("136".equals(phoneNum)){ partition = 0; } else if ("137".equals(phoneNum)){ partition = 1; } else if ("138".equals(phoneNum)){ partition = 2; }else if ("139".equals(phoneNum)){ partition = 3; } else { partition = 4; } return partition; }}


Bean类
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FBP implements WritableComparable
{ private long upFlow; private long downFlow; private long sumFlow; public FBP() { } @Override public int compareTo(FBP bean) { int result = 0; if (sumFlow>bean.getSumFlow()){ result = -1; }else if (upFlow


Mapper阶段
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FBP_M extends Mapper
{ FBP k = new FBP(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 13509468723 7335 110349 117684 // 13975057813 11058 48243 59301 // 1.读取一行数据 String line = value.toString(); // 2.分割 String[] fields = line.split("\t"); // 3.封装对象 String phone = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); long sumFlow = Long.parseLong(fields[3]); k.setUpFlow(upFlow); k.setDownFlow(downFlow); k.setSumFlow(sumFlow); v.set(phone); // 4.写入 context.write(k,v); }}


Reducer阶段
package 第三章_MR框架原理.排序;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FBP_R extends Reducer
{ @Override protected void reduce(FBP key, Iterable
values, Context context) throws IOException, InterruptedException { for (Text value:values){ // 写出 context.write(value,key); } }}


Driver阶段
package 第三章_MR框架原理.排序;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FBP_D {       public static void main(String[] args) {           Job job = null;        Configuration conf = new Configuration();        try {               // 获取job对象            job = Job.getInstance(conf);            // 配置            job.setMapperClass(FBP_M.class);            job.setReducerClass(FBP_R.class);            job.setJarByClass(FBP_D.class);            job.setMapOutputKeyClass(FBP.class);            job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(FBP.class);            // 设置分区信息            job.setPartitionerClass(PhonePartitioner.class);            // 同时指定相应数量的reduce task            job.setNumReduceTasks(5);            // 设置输入输出路径            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\全排output"));            FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\分区排output"));            // 提交job            boolean result = job.waitForCompletion(true);            System.exit(result?0:1);        } catch (Exception e ){               e.printStackTrace();        }    }}

在这里插入图片描述


♦ 思路二:不封装Bean类,按照默认对key排序,自定义分区类

Partition阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class Partition extends Partitioner
{ @Override public int getPartition(IntWritable intWritable, Text text, int numPartitions) { String phoneNum = text.toString().substring(0,3); int partition = 4; if ("136".equals(phoneNum)){ partition = 0; }else if ("137".equals(phoneNum)){ partition = 1; }else if ("138".equals(phoneNum)){ partition = 2; }else if ("139".equals(phoneNum)){ partition = 3; }else { partition = 4; } return partition; }}

Mapper阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PSMapper extends Mapper
{ Text v = new Text(); IntWritable k = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 读取一行数据 String line = value.toString(); // 2.拆分 String[] fields = line.split("\t"); int sunFlow = Integer.parseInt(fields[3]); String info = fields[0]+"\t"+fields[1]+"\t"+fields[2]; k.set(-sunFlow); v.set(info); // 3.写出 context.write(k,v); }}

Reducer阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PSReducer extends Reducer
{ IntWritable v = new IntWritable(); @Override protected void reduce(IntWritable key, Iterable
values, Context context) throws IOException, InterruptedException { for (Text value:values){ v.set(-key.get()); context.write(value,v); } }}

Driver阶段
package 第七章_MR扩展案例.分区排序;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PSDriver {       public static void main(String[] args) {           Job job;        Configuration conf = new Configuration();        try{               // 1. 获取job            job = Job.getInstance(conf);            // 2. 配置            job.setMapperClass(PSMapper.class);            job.setReducerClass(PSReducer.class);            job.setJarByClass(PSDriver.class);            job.setMapOutputKeyClass(IntWritable.class);            job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(IntWritable.class);            // 3.设置分区关联            job.setPartitionerClass(Partition.class);            //   设置ReduceTask个数            job.setNumReduceTasks(5);            // .输入输出数据路径            FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\排序\\全排output\\part-r-00000"));            FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第七章_MR扩展案例\\分区排序\\psoutput"));            // 4.提交job            boolean result = job.waitForCompletion(true);            System.exit(result ? 0:1);        } catch (Exception e){               e.printStackTrace();        }    }}

在这里插入图片描述


上一篇:【Python+Flask+Echarts】可视化练习题 ---- 餐饮数据雷达图
下一篇:【Python】wordcloud 词云图

发表评论

最新留言

很好
[***.229.124.182]2025年04月14日 02时27分18秒