用Spark实现简单的单词统计
发布日期:2021-05-06 23:31:02 浏览次数:25 分类:精选文章

本文共 3074 字,大约阅读时间需要 10 分钟。

用Scala实现

RDD(可以简单理解为是一个list集合,里面放的就是读到的一行一行的数据)是spark中非常核心的内容,只有通过SparkContext才能创建出来RDD。

package com.husky.sparkimport org.apache.spark.rdd.RDDimport org.apache.spark.{   SparkConf, SparkContext}object SparkWordCount {     def main(args: Array[String]): Unit = {       //conf可以设置SparkApplication运行的资源、名称、还有Spark运行模式    val conf = new SparkConf()    conf.setAppName("wordcount")    conf.setMaster("local")//本地运行    //通往集群的唯一通道    val sc = new SparkContext(conf)    //去指定目录读文件,类型是RDD。RDD可以简单理解为是一个List集合,里面放的就是    //读到的一行一行的数据:hello world ni hao    //这里需要注意的是:inpath只会默认找到project一层,不会找到module    val lines: RDD[String] = sc.textFile("data/word")    //把一行数据切开,flatMap的特点就是一对多,用“ ”切割成单词    val words: RDD[String] = lines.flatMap(lines => {         lines.split(" ")    })    //借助二元组计数    val pairword: RDD[(String, Int)] = words.map(word=>{   new Tuple2(word,1)})    //reduceByKey是RDD的方法,1.将相同的key先分组;2.再针对每个组计算;(Int,Int)=>Int    val result: RDD[(String, Int)] = pairword.reduceByKey((v1:Int, v2:Int)=>{   v1+v2})    result.foreach(one=>{   println(one)})      }}

以上代码可以简化,one=>{println(one)}可以简化为:println。

匿名函数(v1:Int, v2:Int)=>{v1+v2}的v1和v2只使用了一次,可以用_代替,简化为:reduceByKey(+)
word=>{new Tuple2(word,1)}中的Tuple可new可不new,并且还可以只写一个括号。word只用了一次,用_表示,因此简化为map((_,1))
lines在map中只使用了一次,可以用_代替

val conf = new SparkConf().setAppName("wordcount").setMaster("local")val sc = new SparkContext(conf)sc.textFile("data/word").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)

利用Scala特性,可以大量的简化代码。但是缺点也很明显,可读性差

用Java实现

package com.husky.java;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;public class SparkWordCount4Java {       public static void main(String[] args) {           SparkConf conf = new SparkConf();        conf.setMaster("local");        conf.setAppName("wc");        JavaSparkContext sc = new JavaSparkContext(conf);        JavaRDD
lines = sc.textFile("data/word"); //进来一行,返回一个迭代器 JavaRDD
words = linesflatMap(new FlatMapFunction
() { @Override public Iterator
call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); }}); JavaPairRDD
pairWords = words.mapToPair(new PairFunction
() { //进来一个单词,出去一个K-V @Override public Tuple2
call(String s) throws Exception { return new Tuple2<>(s, 1); } }); JavaPairRDD
result = pairWords.reduceByKey(new Function2
() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); //打印 result.foreach(new VoidFunction
>() { @Override public void call(Tuple2
tp) throws Exception { System.out.println(tp); } }); sc.stop(); }}
上一篇:Spark任务提交流程
下一篇:Redis哨兵机制

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2025年03月26日 23时14分16秒