
本文共 9712 字,大约阅读时间需要 32 分钟。
需求
推荐好友的好友。
原始数据:
小明 老王 如花 林志玲老王 小明 凤姐如花 小明 李刚 凤姐林志玲 小明 李刚 凤姐 郭美美李刚 如花 凤姐 林志玲郭美美 凤姐 林志玲凤姐 如花 老王 林志玲 郭美美 李刚
思路
根据好友列表,先找出直接好友,生成好友列表。然后再根据好友列表,找出fof。去除已经是好友的fof。
案例实现
1.Mapper1
我们要根据原始数据,得到直接好友和fof。还要借助getFof()方法保持一致性,“老王与小明”及“小明与老王”,应该算作是1个。
然后我们就在map端直接向外输出直接好友、fof,并根据输出的value的不同来做区分。String user = key.toString();//key是经过输入格式化类处理过的,也就是用户自己 //制表符切割value得到数组,[老王,如花,林志玲],也就是用户自己的好友列表 String[] friends = value.toString().split("\t"); for (int i = 0; i < friends.length; i++) { //迭代好友列表 String f1 = friends[i]; String userAndFriend = getFof(user, f1);//保证顺序的一致性 context.write(new Text(userAndFriend), new IntWritable(-1));//输出直接好友,如小明:老王|小明:如花|小明:林志玲 for (int j = i+1; j < friends.length; j++) { String f2 = friends[j]; String fof = getFof(f1, f2); context.write(new Text(fof), new IntWritable(1));//输出fof,如老王:如花|老王:林志玲|如花:林志玲 } }
2.Reducer1
数据经过map端的处理后,抵达reduce端。我们要做的就是去除已经是好友的fof,并进行统计。这里分组无需定义,KEY相同的自然就是一组了。两人是fof关系的同时,也有可能是直接好友,所以我们要加一个flag进行判断。通过对拿到的数据循环,一旦发现value为-1的,就舍弃不要,直接break。发现value为1的才进行累加统计。
比如:假设直接好友关系的“小明:老王”,还有fof关系的“小明:老王”,这两组KEY相同,但是value一个是-1一个是1。这一组进来到reduce方法后,我就可以拿着直接好友关系的“小明:老王”,去判断、筛查fof关系的“小明:老王”。一旦发现有这样的情况,就直接break了,并且设置flag为false。flag为false的时候,根本不会向外输出了。int sum = 0; boolean flag = true;//判断是否存在直接好友的fof for(IntWritable i:iter){ if (i.get()==-1) { //是直接好友了 flag=false; break; }else { sum+=i.get(); } } if (flag) { context.write(key, new IntWritable(sum)); }
经过上述过程的“甄别”,真正的fof关系的好友就重新出炉了,且我们还给他们打上了“sum”的标记(sum越大,就证明越应该推荐他俩互加好友呗!)。
3.第一次MR结果
小明:凤姐 3李刚:小明 2林志玲:如花 3老王:如花 2老王:李刚 1老王:林志玲 2郭美美:如花 1郭美美:小明 1郭美美:李刚 2郭美美:老王 1
接下来,我们还要根据fof出现的次数来降序排序,并给出推荐列表。有了上面的结果,我们还要再来一波MR才行!此时,上面的输出数据就是下一个MR的数据源。
接下来,我们要根据数值来排序,根据用户来分组。根据经验,我们应该把(用户、数字)作为键(与天气案例中以年、月、温度为键类似),4.Mapper2
经过第一轮MR,我们得到了诸如“小明:如花 3|李刚:小明 2”的结果。这样的数据会流进map端,被处理。由于数据要以数字排序,以用户分组,所以我们下一步想要得到的,就是“K(小明,3)”。小明和如花是fof关系,所以在推荐的时候,不光要把小明推荐给如花,还要把如花推荐给小明。因此,我们没读取一条数据,实际上是要输出两条的。以“小明:如花 3|李刚:小明 2”为例,经过处理后我们最想得到的就是:“①K(小明,3)V:如花 | ②K(如花,3)V:小明 | ③K(李刚,2) V:小明 | ④K(小明,2) V:李刚”
String[] users = key.toString().split(":");//将小明:凤姐按照“:”分割为数组 int count = Integer.parseInt(value.toString());//value就是3 //拼接K-V,使K为小明,3,V为凤姐。并且因为要相互推荐,所以要输出两条 Text k1 = new Text(users[0]+","+count); Text v1 = new Text(users[1]); context.write(k1, v1); Text k2 = new Text(users[1]+","+count); Text v2 = new Text(users[0]); context.write(k2, v2);
经过map端处理后,K为(小明,3),V为凤姐。再往后,还要经历spill to dask,并调用排序算法进行排序。
5.SortComparator
先比较用户是否相同,用户相同在对value进行降序排序
//流进来的数据为:小明,3 String[] kk1 = k1.toString().split(","); String[] kk2 = k2.toString().split(","); int r1 = kk1[0].compareTo(kk2[0]); if (r1==0){ return -Integer.compare(Integer.parseInt(kk1[1]), Integer.parseInt(kk2[1])); } return r1;
再往后还要经历分组
6.GroupComparator
分组只比较用户就可以了,结果直接return
//小明,3;按照“,”分割后,只比较用户 String[] kk1 =k1.toString().split(","); String[] kk2 =k2.toString().split(","); return kk1[0].compareTo(kk2[0]);
上述数据经历过分组后,“小明,3 如花 | 小明,2 李刚”就在一组了(因为我按照用户进行比较,也就是只比较小明),就要封装为迭代器(里面装的全是value,比如“小明组”里放的就是如花和李刚。目的就是为了让reduce能够迭代出来这些value)向reduce端传递了。我们期盼数据经过一次聚合后,得到“小明 如花3,李刚2”这样的。
7.Reducer2
reduce端开始对数据进行聚合,K:用户,V:推荐列表;循环取出值后,用StringBuffer进行拼接,并在最后去除最后一个制表符"\t"
StringBuffer sb =new StringBuffer(); //K:小明,3 String user =key.toString().split(",")[0]; for(Text v:iter){ //迭代小明这一组,得到所有的组员(如花、李刚),并append sb.append(v.toString()).append("\t"); } sb.substring(0, sb.length()-1); context.write(new Text(user), new Text(sb.toString()));
最终,经历上述两次Job,我们得到了想要的结果:
凤姐 小明 如花 林志玲 老王 郭美美 小明 凤姐 李刚 郭美美 李刚 郭美美 小明 老王 林志玲 如花 老王 老王 林志玲 如花 李刚 郭美美 郭美美 李刚 如花 小明 老王
完整代码:
package com.husky.hadoop.fof;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RunJob { public static void main(String[] args) { Configuration conf = new Configuration(); try { Job job = Job.getInstance(); FileSystem fs = FileSystem.get(conf); job.setJobName("wc"); job.setJarByClass(RunJob.class); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setNumReduceTasks(1); //设置计算输入数据 FileInputFormat.addInputPath(job, new Path("/input/friend")); //设置计算输出目录(mapreduce计算完成之后,最后的结果存放的目录) Path outpath =new Path("/output/f1/"); //该目录必须不能存在,如果存在计算框架会出错 if(fs.exists(outpath)){ //如果存在该目录,则删除 fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); //开始执行 boolean f =job.waitForCompletion(true); if (f) { System.out.println("MR1执行成功!"); job =Job.getInstance(conf); job.setJobName("fof"); job.setJarByClass(RunJob.class); job.setMapperClass(Mapper2.class); job.setReducerClass(Reducer2.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setSortComparatorClass(SortComparator.class); job.setGroupingComparatorClass(GroupComparator.class);// job.setCombinerClass(WordCountReducer.class);//指定一个combiner的类 job.setNumReduceTasks(1); //设置reduce的数量 job.setInputFormatClass(KeyValueTextInputFormat.class); //设置计算输入数据 FileInputFormat.addInputPath(job, new Path("/output/f1/")); //设置计算输出目录(mapreduce计算完成之后,最后的结果存放的目录) outpath =new Path("/output/f2/"); //该目录必须不能存在,如果存在计算框架会出错 if(fs.exists(outpath)){ //如果存在该目录,则删除 fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); //开始执行 f =job.waitForCompletion(true); if(f){ System.out.println("MR2执行成功"); } } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 原始数据:小明 老王 如花 林志玲 * 因为输入格式化类使用的是KeyValueTextInputFormat.class,因此key:小明;value:老王 如花 林志玲 * */ static class Mapper1 extends Mapper{ @Override protected void map(Text key, Text value,Context context) throws IOException, InterruptedException { String user = key.toString();//key是经过输入格式化类处理过的,也就是用户自己 //制表符切割value得到数组,[老王,如花,林志玲],也就是用户自己的好友列表 String[] friends = value.toString().split("\t"); for (int i = 0; i < friends.length; i++) { //迭代好友列表 String f1 = friends[i]; String userAndFriend = getFof(user, f1);//保证顺序的一致性 context.write(new Text(userAndFriend), new IntWritable(-1));//输出直接好友,如小明:老王|小明:如花|小明:林志玲 for (int j = i+1; j < friends.length; j++) { String f2 = friends[j]; String fof = getFof(f1, f2); context.write(new Text(fof), new IntWritable(1));//输出fof,如老王:如花|老王:林志玲|如花:林志玲 } } } } static class Reducer1 extends Reducer { @Override protected void reduce(Text key, Iterable iter, Context context) throws IOException, InterruptedException { int sum = 0; boolean flag = true;//判断是否存在直接好友的fof for(IntWritable i:iter){ if (i.get()==-1) { //是直接好友了 flag=false; break; }else { sum+=i.get(); } } if (flag) { context.write(key, new IntWritable(sum)); } } } /** * 流进来的原始数据是:小明:凤姐 3 ,输入格式化类用的还是KeyValueTextInputFormat.class, * 所以制表符前面是map端的key,制表符后面是map端的value * */ static class Mapper2 extends Mapper { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String[] users = key.toString().split(":");//将小明:凤姐按照“:”分割为数组 int count = Integer.parseInt(value.toString());//value就是3 //拼接K-V,使K为小明,3,V为凤姐。并且因为要相互推荐,所以要输出两条 Text k1 = new Text(users[0]+","+count); Text v1 = new Text(users[1]); context.write(k1, v1); Text k2 = new Text(users[1]+","+count); Text v2 = new Text(users[0]); context.write(k2, v2); } } static class SortComparator extends WritableComparator{ //必须要有构造 public SortComparator(){ super(Text.class,true); } public int compare(WritableComparable a,WritableComparable b){ Text k1 = (Text)a; Text k2 = (Text)b; //流进来的数据为:小明,3 String[] kk1 = k1.toString().split(","); String[] kk2 = k2.toString().split(","); int r1 = kk1[0].compareTo(kk2[0]); if (r1==0){ return -Integer.compare(Integer.parseInt(kk1[1]), Integer.parseInt(kk2[1])); } return r1; } } static class GroupComparator extends WritableComparator{ public GroupComparator(){ super(Text.class,true); } public int compare(WritableComparable a,WritableComparable b){ Text k1 =(Text) a; Text k2 =(Text) b; //小明,3;按照“,”分割后,只比较用户 String[] kk1 =k1.toString().split(","); String[] kk2 =k2.toString().split(","); return kk1[0].compareTo(kk2[0]); } } static class Reducer2 extends Reducer { @Override protected void reduce(Text key, Iterable iter, Context context) throws IOException, InterruptedException { StringBuffer sb =new StringBuffer(); //K:小明,3 String user =key.toString().split(",")[0]; for(Text v:iter){ //迭代小明这一组,得到所有的组员,并append sb.append(v.toString()).append("\t"); } sb.substring(0, sb.length()-1); context.write(new Text(user), new Text(sb.toString())); } } /** * 保证一致性的顺序 * */ static String getFof(String user1,String user2){ if (user1.compareTo(user2)>0) { return user1+":"+user2; }else { return user2+":"+user1; } }}
发表评论
最新留言
关于作者
