一篇文章搞懂fof好友推荐案例
发布日期:2021-05-06 23:32:58 浏览次数:23 分类:精选文章

本文共 9712 字,大约阅读时间需要 32 分钟。

需求

推荐好友的好友。

原始数据:

小明	老王	如花	林志玲老王	小明	凤姐如花	小明	李刚	凤姐林志玲	小明	李刚	凤姐	郭美美李刚	如花	凤姐	林志玲郭美美	凤姐	林志玲凤姐	如花	老王	林志玲	郭美美	李刚

思路

根据好友列表,先找出直接好友,生成好友列表。然后再根据好友列表,找出fof。去除已经是好友的fof。

案例实现

1.Mapper1

我们要根据原始数据,得到直接好友和fof。还要借助getFof()方法保持一致性,“老王与小明”及“小明与老王”,应该算作是1个。

然后我们就在map端直接向外输出直接好友、fof,并根据输出的value的不同来做区分。

String user = key.toString();//key是经过输入格式化类处理过的,也就是用户自己			//制表符切割value得到数组,[老王,如花,林志玲],也就是用户自己的好友列表			String[] friends = value.toString().split("\t");			for (int i = 0; i < friends.length; i++) {    //迭代好友列表				String f1 = friends[i];				String userAndFriend = getFof(user, f1);//保证顺序的一致性				context.write(new Text(userAndFriend), new IntWritable(-1));//输出直接好友,如小明:老王|小明:如花|小明:林志玲				for (int j = i+1; j < friends.length; j++) {   					String f2 = friends[j];					String fof = getFof(f1, f2);					context.write(new Text(fof), new IntWritable(1));//输出fof,如老王:如花|老王:林志玲|如花:林志玲				}			}

2.Reducer1

数据经过map端的处理后,抵达reduce端。我们要做的就是去除已经是好友的fof,并进行统计。这里分组无需定义,KEY相同的自然就是一组了。两人是fof关系的同时,也有可能是直接好友,所以我们要加一个flag进行判断。通过对拿到的数据循环,一旦发现value为-1的,就舍弃不要,直接break。发现value为1的才进行累加统计。

比如:假设直接好友关系的“小明:老王”,还有fof关系的“小明:老王”,这两组KEY相同,但是value一个是-1一个是1。这一组进来到reduce方法后,我就可以拿着直接好友关系的“小明:老王”,去判断、筛查fof关系的“小明:老王”。一旦发现有这样的情况,就直接break了,并且设置flag为false。flag为false的时候,根本不会向外输出了。

int sum = 0;			boolean flag = true;//判断是否存在直接好友的fof			for(IntWritable i:iter){   				if (i.get()==-1) {   //是直接好友了					flag=false;					break;				}else {   					sum+=i.get();				}			}			if (flag) {   				context.write(key, new IntWritable(sum));			}

经过上述过程的“甄别”,真正的fof关系的好友就重新出炉了,且我们还给他们打上了“sum”的标记(sum越大,就证明越应该推荐他俩互加好友呗!)。

3.第一次MR结果

小明:凤姐	3李刚:小明	2林志玲:如花	3老王:如花	2老王:李刚	1老王:林志玲	2郭美美:如花	1郭美美:小明	1郭美美:李刚	2郭美美:老王	1

接下来,我们还要根据fof出现的次数来降序排序,并给出推荐列表。有了上面的结果,我们还要再来一波MR才行!此时,上面的输出数据就是下一个MR的数据源。

接下来,我们要根据数值来排序,根据用户来分组。根据经验,我们应该把(用户、数字)作为键(与天气案例中以年、月、温度为键类似),

4.Mapper2

经过第一轮MR,我们得到了诸如“小明:如花 3|李刚:小明 2”的结果。这样的数据会流进map端,被处理。由于数据要以数字排序,以用户分组,所以我们下一步想要得到的,就是“K(小明,3)”。小明和如花是fof关系,所以在推荐的时候,不光要把小明推荐给如花,还要把如花推荐给小明。因此,我们没读取一条数据,实际上是要输出两条的。以“小明:如花 3|李刚:小明 2”为例,经过处理后我们最想得到的就是:“①K(小明,3)V:如花 | ②K(如花,3)V:小明 | ③K(李刚,2) V:小明 | ④K(小明,2) V:李刚”

String[] users = key.toString().split(":");//将小明:凤姐按照“:”分割为数组			int count = Integer.parseInt(value.toString());//value就是3						//拼接K-V,使K为小明,3,V为凤姐。并且因为要相互推荐,所以要输出两条			Text k1 = new Text(users[0]+","+count);			Text v1 = new Text(users[1]);			context.write(k1, v1);						Text k2 = new Text(users[1]+","+count);			Text v2 = new Text(users[0]);			context.write(k2, v2);

经过map端处理后,K为(小明,3),V为凤姐。再往后,还要经历spill to dask,并调用排序算法进行排序。

5.SortComparator

先比较用户是否相同,用户相同在对value进行降序排序

//流进来的数据为:小明,3			String[] kk1 = k1.toString().split(",");			String[] kk2 = k2.toString().split(",");			int r1 = kk1[0].compareTo(kk2[0]);			if (r1==0){   				return -Integer.compare(Integer.parseInt(kk1[1]), Integer.parseInt(kk2[1]));			} 			return r1;

再往后还要经历分组

6.GroupComparator

分组只比较用户就可以了,结果直接return

//小明,3;按照“,”分割后,只比较用户			String[] kk1 =k1.toString().split(",");			String[] kk2 =k2.toString().split(",");			return kk1[0].compareTo(kk2[0]);

上述数据经历过分组后,“小明,3 如花 | 小明,2 李刚”就在一组了(因为我按照用户进行比较,也就是只比较小明),就要封装为迭代器(里面装的全是value,比如“小明组”里放的就是如花和李刚。目的就是为了让reduce能够迭代出来这些value)向reduce端传递了。我们期盼数据经过一次聚合后,得到“小明 如花3,李刚2”这样的。

7.Reducer2

reduce端开始对数据进行聚合,K:用户,V:推荐列表;循环取出值后,用StringBuffer进行拼接,并在最后去除最后一个制表符"\t"

StringBuffer sb =new StringBuffer();			//K:小明,3			String user =key.toString().split(",")[0];			for(Text v:iter){   //迭代小明这一组,得到所有的组员(如花、李刚),并append				sb.append(v.toString()).append("\t");			}			sb.substring(0, sb.length()-1);			context.write(new Text(user), new Text(sb.toString()));

最终,经历上述两次Job,我们得到了想要的结果:

凤姐	小明	如花	林志玲	老王	郭美美	小明	凤姐	李刚	郭美美	李刚	郭美美	小明	老王	林志玲	如花	老王	老王	林志玲	如花	李刚	郭美美	郭美美	李刚	如花	小明	老王

完整代码:

package com.husky.hadoop.fof;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RunJob {   	public static void main(String[] args) {   		Configuration conf = new Configuration();		try {   			Job job = Job.getInstance();			FileSystem fs = FileSystem.get(conf);			job.setJobName("wc");			job.setJarByClass(RunJob.class);						job.setMapperClass(Mapper1.class);			job.setReducerClass(Reducer1.class);						job.setMapOutputKeyClass(Text.class);			job.setMapOutputValueClass(IntWritable.class);						job.setInputFormatClass(KeyValueTextInputFormat.class);			job.setNumReduceTasks(1);			//设置计算输入数据			FileInputFormat.addInputPath(job, new Path("/input/friend"));			//设置计算输出目录(mapreduce计算完成之后,最后的结果存放的目录)			Path outpath =new Path("/output/f1/");  //该目录必须不能存在,如果存在计算框架会出错			if(fs.exists(outpath)){   //如果存在该目录,则删除				fs.delete(outpath, true);			}			FileOutputFormat.setOutputPath(job, outpath);						//开始执行			boolean f =job.waitForCompletion(true);			if (f) {   				System.out.println("MR1执行成功!");				job =Job.getInstance(conf);				job.setJobName("fof");				job.setJarByClass(RunJob.class);								job.setMapperClass(Mapper2.class);				job.setReducerClass(Reducer2.class);								job.setMapOutputKeyClass(Text.class);				job.setMapOutputValueClass(Text.class);				job.setSortComparatorClass(SortComparator.class);				job.setGroupingComparatorClass(GroupComparator.class);//				job.setCombinerClass(WordCountReducer.class);//指定一个combiner的类								job.setNumReduceTasks(1); //设置reduce的数量				job.setInputFormatClass(KeyValueTextInputFormat.class);				//设置计算输入数据				FileInputFormat.addInputPath(job, new Path("/output/f1/"));				//设置计算输出目录(mapreduce计算完成之后,最后的结果存放的目录)				outpath =new Path("/output/f2/");  //该目录必须不能存在,如果存在计算框架会出错				if(fs.exists(outpath)){   //如果存在该目录,则删除					fs.delete(outpath, true);				}				FileOutputFormat.setOutputPath(job, outpath);								//开始执行				f =job.waitForCompletion(true);				if(f){   					System.out.println("MR2执行成功");				}			}					} catch (Exception e) {   			// TODO Auto-generated catch block			e.printStackTrace();		}	}	/**	 * 原始数据:小明	老王		如花		林志玲	 * 因为输入格式化类使用的是KeyValueTextInputFormat.class,因此key:小明;value:老王	如花		林志玲	 * */	static class Mapper1 extends Mapper
{ @Override protected void map(Text key, Text value,Context context) throws IOException, InterruptedException { String user = key.toString();//key是经过输入格式化类处理过的,也就是用户自己 //制表符切割value得到数组,[老王,如花,林志玲],也就是用户自己的好友列表 String[] friends = value.toString().split("\t"); for (int i = 0; i < friends.length; i++) { //迭代好友列表 String f1 = friends[i]; String userAndFriend = getFof(user, f1);//保证顺序的一致性 context.write(new Text(userAndFriend), new IntWritable(-1));//输出直接好友,如小明:老王|小明:如花|小明:林志玲 for (int j = i+1; j < friends.length; j++) { String f2 = friends[j]; String fof = getFof(f1, f2); context.write(new Text(fof), new IntWritable(1));//输出fof,如老王:如花|老王:林志玲|如花:林志玲 } } } } static class Reducer1 extends Reducer
{ @Override protected void reduce(Text key, Iterable
iter, Context context) throws IOException, InterruptedException { int sum = 0; boolean flag = true;//判断是否存在直接好友的fof for(IntWritable i:iter){ if (i.get()==-1) { //是直接好友了 flag=false; break; }else { sum+=i.get(); } } if (flag) { context.write(key, new IntWritable(sum)); } } } /** * 流进来的原始数据是:小明:凤姐 3 ,输入格式化类用的还是KeyValueTextInputFormat.class, * 所以制表符前面是map端的key,制表符后面是map端的value * */ static class Mapper2 extends Mapper
{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String[] users = key.toString().split(":");//将小明:凤姐按照“:”分割为数组 int count = Integer.parseInt(value.toString());//value就是3 //拼接K-V,使K为小明,3,V为凤姐。并且因为要相互推荐,所以要输出两条 Text k1 = new Text(users[0]+","+count); Text v1 = new Text(users[1]); context.write(k1, v1); Text k2 = new Text(users[1]+","+count); Text v2 = new Text(users[0]); context.write(k2, v2); } } static class SortComparator extends WritableComparator{ //必须要有构造 public SortComparator(){ super(Text.class,true); } public int compare(WritableComparable a,WritableComparable b){ Text k1 = (Text)a; Text k2 = (Text)b; //流进来的数据为:小明,3 String[] kk1 = k1.toString().split(","); String[] kk2 = k2.toString().split(","); int r1 = kk1[0].compareTo(kk2[0]); if (r1==0){ return -Integer.compare(Integer.parseInt(kk1[1]), Integer.parseInt(kk2[1])); } return r1; } } static class GroupComparator extends WritableComparator{ public GroupComparator(){ super(Text.class,true); } public int compare(WritableComparable a,WritableComparable b){ Text k1 =(Text) a; Text k2 =(Text) b; //小明,3;按照“,”分割后,只比较用户 String[] kk1 =k1.toString().split(","); String[] kk2 =k2.toString().split(","); return kk1[0].compareTo(kk2[0]); } } static class Reducer2 extends Reducer
{ @Override protected void reduce(Text key, Iterable
iter, Context context) throws IOException, InterruptedException { StringBuffer sb =new StringBuffer(); //K:小明,3 String user =key.toString().split(",")[0]; for(Text v:iter){ //迭代小明这一组,得到所有的组员,并append sb.append(v.toString()).append("\t"); } sb.substring(0, sb.length()-1); context.write(new Text(user), new Text(sb.toString())); } } /** * 保证一致性的顺序 * */ static String getFof(String user1,String user2){ if (user1.compareTo(user2)>0) { return user1+":"+user2; }else { return user2+":"+user1; } }}
上一篇:Redis持久化
下一篇:MapReduce天气案例

发表评论

最新留言

很好
[***.229.124.182]2025年04月09日 01时53分51秒