
本文共 9755 字,大约阅读时间需要 32 分钟。
需求:
1949-10-01 14:21:02 34c1949-10-02 14:01:02 36c1950-01-01 11:21:02 32c1950-10-01 12:21:02 37c1951-12-01 12:21:02 23c1950-10-02 12:21:02 41c1950-10-03 12:21:02 27c1951-07-01 12:21:02 45c1951-07-02 12:21:02 46c1951-07-03 12:21:03 47c
统计每一年的每一个月中的气温最高的前三个,且一年的数据结果输出到一个文件。
需求分析:
分组这里还会用到比较器,要按照年、月分组,因此这个自定义KEY比较复杂了就,包含了年、月、温度。
案例实现:
在MR过程中Key往往用于分组或排序,当hadoop内置的key键的数据类型不能满足需求时,就需要自定义key了。接下来马上先定义一个键。
1.自定义Key
我们以前自定义的Mapper类中碰见最多的键是Text,来参考一下:public class Text extends BinaryComparable implements WritableComparable。他的类实现了一个接口WritableComparable。根据上面的注释:
Example:
** public class MyWritableComparable implements WritableComparable{ * // Some data * private int counter; * private long timestamp; * * public void write(DataOutput out) throws IOException { * out.writeInt(counter); * out.writeLong(timestamp); * } * * public void readFields(DataInput in) throws IOException { * counter = in.readInt(); * timestamp = in.readLong(); * } * * public int compareTo(MyWritableComparable o) { * int thisValue = this.value; * int thatValue = o.value; * return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); * } * * public int hashCode() { * final int prime = 31; * int result = 1; * result = prime * result + counter; * result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); * return result * } * } *
这里要提两个接口:Writable接口和WritableComparable接口。在MR最终实现Writable接口的类可以是值,而实现WritableComparable接口的类可以是键,也可以是值。我自己可以定义一个MyWritableComparable来实现这个接口。这个接口继承了两个接口,writable接口定义了序列化和反序列化,compareable就负责比较。
spill to disk的过程中,调用快排算法的时候会调用比较器,优先调用户自定义比较器,其次才是KEY自己的比较器。把“温度”放到KEY里面去比较,比较合适(根据需求)。并且,有可能会用到分组来做聚合,分组得用“年-月”来分组,聚合的时候把数据按照分组聚合。
package com.husky.hadoop.weather;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class MyKey implements WritableComparable{ private int year; private int month; private double temperature; public MyKey() { super(); } public MyKey(int year, int month, double temperature) { super(); this.year = year; this.month = month; this.temperature = temperature; } public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getTemperature() { return temperature; } public void setTemperature(double temperature) { this.temperature = temperature; } /** * 把对象写到流里面去,就是序列化和反序列化 * */ @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeDouble(temperature); } /** * 把对象从输入流里面读出来 * */ @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.temperature=in.readDouble(); } /** * 当前key的比较方法,在排序时调用。返回0、正数、负数 * 不能只比较温度,必须得在年月相同的情况下,再去比较温度 * */ @Override public int compareTo(MyKey o) { int r1 = Integer.compare(this.getYear(), o.getYear()); if (r1==0) { int r2 = Integer.compare(this.getMonth(), o.getMonth()); if (r2==0) { //降序排序 return -Double.compare(this.getTemperature(), o.getTemperature()); } return r2; } return r1; }}
2.Mapper类
map是先读取一行数据,1949-10-01 14:21:02 34c ——> MyKey(1949,10,36):Text
K-V默认用的是偏移量和读取的一条记录,用到的是FileInputFormat,但是我们可以换掉。用KeyValueTextInputFormat,它用到的是KeyValueLineRecordReader:
public class KeyValueTextInputFormat extends FileInputFormat{ @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } public RecordReader createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException { context.setStatus(genericSplit.toString()); return new KeyValueLineRecordReader(context.getConfiguration()); }}
看一下KeyValueLineRecordReader的一个关键方法setKeyValue:
public static void setKeyValue(Text key, Text value, byte[] line, int lineLen, int pos) { if (pos == -1) { key.set(line, 0, lineLen); value.set(""); } else { key.set(line, 0, pos); value.set(line, pos + 1, lineLen - pos - 1); } }
pos==-1表示没有制表符,就把整个一行都作为key,把value set(“”)。否则,制表符前的key(1949-10-01 14:21:02),后面为value(34c)。所以KEYIN就是Text
package com.husky.hadoop.weather;import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WeatherMapper extends Mapper{ static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");//得到时间对象 @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //KEY:1949-10-01 14:21:02 ;VALUE:36C try { //根据时间对象去把年、月取出来 Date date = sdf.parse(key.toString()); Calendar c = Calendar.getInstance();//下面继续拿年和月 c.setTime(date); int year = c.get(Calendar.YEAR);//拿出year int month = c.get(Calendar.MONTH);//拿出month //把34C切割,拿出34 double temperature = Double.parseDouble(value.toString().substring(0, value.toString().length()-1)); MyKey outkey = new MyKey(year,month,temperature); Text outvalue = new Text(key+"\t"+value); context.write(outkey, outvalue); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
3.自定义分区类
一条记录经过Map之后,K-V要打上P的标签明确自己未来要去哪个分区。很明显这里不能根据value的值来确定分区号,必须得根据year来确定分区号:
package com.husky.hadoop.weather;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;//从Mapper出来的数据类型就是public class MyPartitioner extends Partitioner { @Override public int getPartition(MyKey key, Text value, int numPartitions) { // TODO Auto-generated method stub return key.getYear()%numPartitions;//分区的数量就是reduce的数量,这里是3 }}
4.自定义分组比较器
根据年月分组,此时已经排好序了,因此温度就不用再考虑了。只需要管年月并进行分组就可以了。自定义分组比较器一定有构造方法,并提供比较方法
package com.husky.hadoop.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MyGroupCompareTo extends WritableComparator{ //构造方法 public MyGroupCompareTo(){ super(MyKey.class,true);//指定类,告诉它用哪个类比较,true表示是否构造当前对象 } public int compare(WritableComparable a,WritableComparable b){ //类型强转 MyKey k1 = (MyKey)a; MyKey k2 = (MyKey)b; //先比较年,后比较月,得到结果直接return int r1 = Integer.compare(k1.getYear(), k2.getYear()); if (r1==0) { return Integer.compare(k1.getMonth(), k2.getMonth());//返回0 } return r1;//返回非0 }}
5.自定义Reducer类
数据经过分组,就可以流入Reducer中了。Reducer的输入,就是Mapper的输出,所以KEYIN为MyKey,VALUEIN为Text。反观Reducer的输出就很自由了,KEYOUT可以是Text,也可以是NullWritable,VALUEOUT也是如此。我们传入Reducer的是一整坨数据“1949-10-01 14:21:02 34c”
package com.husky.hadoop.weather;import java.io.IOException;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WeatherReducer extends Reducer{ @Override protected void reduce(MyKey key, Iterable iter, Context context) throws IOException, InterruptedException { int num = 0; for(Text value : iter){ if (num>=3) { break; } //key:1950-10-02 12:21:02 41c;value为null。反过来也行,我想咋滴就咋滴 context.write(value, NullWritable.get());//输出 num++; } }}
6.Client客户端类
在提交Job任务之前,需要对分组比较器、输入格式化类、分区器作出设置
package com.husky.hadoop.weather;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.husky.hadoop.wc.MyMapper;import com.husky.hadoop.wc.MyReducer;import com.husky.hadoop.wc.MyWC;public class RunJob { public static void main(String[] args) { 客户端自动读取配置文件,并把配置信息加载到conf对象中 Configuration conf = new Configuration(true); try { //job Job job = Job.getInstance(conf); FileSystem fs = FileSystem.get(conf); //必须要配置的,入口类 job.setJarByClass(RunJob.class); //设置job name job.setJobName("weather"); //设置Mapper和Reducer job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); //设置分组比较器 job.setGroupingComparatorClass(MyGroupCompareTo.class); //弃用FileInputFormat,改用KeyValueTextInputFormat job.setInputFormatClass(KeyValueTextInputFormat.class); //指定自定义分区类 job.setPartitionerClass(MyPartitioner.class); //设置输出的K-V类型 job.setOutputKeyClass(MyKey.class); job.setOutputValueClass(Text.class); //设置reduce的数量,默认1 job.setNumReduceTasks(3); //设置计算输入数据,path就是hdfs上的文件路径 FileInputFormat.addInputPath(job, new Path("/input/weather")); //设置计算输出目录,最后的计算结果要在这该目录中 Path outPath = new Path("/output/weather/");//该目录必须不存在,否则计算容易出错 if (fs.exists(outPath)) { //如果目录存在就删除 fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job, outPath); //开始执行 boolean f = job.waitForCompletion(true); if (f) { System.out.println("MapReduce程序执行成功!"); } } catch (Exception e) { // TODO: handle exception } }}
执行结果
根据年来定义的分区数量,其中一个的结果为:
1950-01-01 11:21:02 32c1950-10-02 12:21:02 41c1950-10-01 12:21:02 37c1950-10-03 12:21:02 27c
发表评论
最新留言
关于作者
