
大数据学习之Spark——05RDD算子
发布日期:2021-05-06 17:19:31
浏览次数:29
分类:精选文章
本文共 4457 字,大约阅读时间需要 14 分钟。
RDD整体上分为Value类型和Key-Value类型
一. Transformation算子
Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
1. Value类型
1. map(func)
- 作用: 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
- 案例: 将原RDD中的元素都乘以2
2. flatMap(func)
- 作用: 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
- 案例:
3. groupBy(func)
- 作用: 分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
- 案例: 将原RDD中的元素按照奇偶进行分组
4. fliter(func)
- 作用: 过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
- 案例: 过滤掉原RDD中的所有奇数
5. sample(withReplacement, fraction, seed)
- 作用 随机抽样
- withReplacement: 表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
- fraction: 表示抽取的比例(约等于)
- seed: 用于指定随机数生成器种子。
- 案例
6. distinct([numTasks]))
- 作用 对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
- 案例: 去重
7. coalesce(numPartitions)
-
作用:
减少分区数, 用于大数据集过滤后,提高小数据集的执行效率。 -
案例:
8. repartition(numPartitions)
- 作用 根据分区数,重新通过网络随机洗牌所有数据。
- 案例
9. sortBy(func,[ascending], [numTasks])
-
作用
使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。 -
案例
2. 双Value类型交互
1. union(otherDataset)
- 作用 对源RDD和参数RDD求并集后返回一个新的RDD, 只是合并, 并不会去重
- 案例
- 实现去重(union + distinct)
2. subtract (otherDataset)
- 作用: 计算差集的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
- 案例
3. intersection(otherDataset)
- 作用 对源RDD和参数RDD求交集后返回一个新的RDD
- 案例
4. cartesian(otherDataset)
- 作用: 笛卡尔积
- 案例
5. zip(otherDataset)
-
作用:
将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。 -
案例:
3. Key - Value 类型
1. partitionBy
- 作用: 对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
- 案例:
2. groupByKey()
-
作用
groupByKey也是对每个key进行操作,但只生成一个sequence -
案例
-
代码:
val words: RDD[String] = sc.parallelize(Array("one", "two", "two", "three", "three", "three"))val mapByRDD: RDD[(String, Int)] = words.map((_, 1))val groupByDRR: RDD[(String, Iterable[Int])] = mapByRDD.groupByKey()groupByDRR.foreach(println(_))val result: RDD[(String, Int)] = groupByDRR.map(one => (one._1, one._2.sum))result.foreach(println(_))
3. reduceByKey(func, [numTasks])
-
作用:
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。 -
案例:
reduceByKey和groupByKey的区别
- reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
- groupByKey:按照key进行分组,直接进行shuffle。
4. sortByKey([ascending], [numTasks])
- 作用 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
- 案例
5. mapValues
- 作用: 针对于(K,V)形式的类型只对V进行操作
- 案例:
6. join(otherDataset, [numTasks])
-
作用
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD -
案例
7. cogroup(otherDataset, [numTasks])
-
作用:
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD 将key相同的数据聚合到一个迭代器 -
案例:
二. Action算子
Action类算子也是一类算子(函数)叫做行动算子。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
1. reduce(func)
- 作用: 通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
- 案例:
2. collect()
- 作用: 在驱动程序中,以数组的形式返回数据集的所有元素。
- 案例
3. count()
- 作用: 返回RDD中元素的个数
- 案例
4. take(n)
-
作用:
返回一个由RDD的前n个元素组成的数组 -
案例:
5. first()
- 作用: 返回RDD中的第一个元素 等价于: take(1)
- 案例:
6. takeOrdered(n)
- 作用: 返回该RDD排序后的前n个元素组成的数组
- 案例:
7. countByKey()
- 作用: 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
- 案例
8. foreach(func)
- 作用: 在数据集的每一个元素上,运行函数func进行更新
- 案例
三. 综合运用
-
题目
统计出每一个省份点击次数为前三的广告 -
数据格式:
时间戳,省份,城市,用户,广告,中间字段使用空格分割。时间戳 省份ID 城市ID 用户ID 广告ID1516609143867 6 7 64 161516609143869 9 4 75 181516609143869 1 7 87 12
-
代码
package com.hjf.coreimport org.apache.spark.rdd.RDDimport org.apache.spark.{ SparkConf, SparkContext}/** * @author Jiang锋时刻 * @create 2020-07-22 17:02 */object TestAgent { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setMaster("local").setAppName("Agent") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("Error") // 读取数据源 val lines: RDD[String] = sc.textFile("./data/agent.txt") // 将省份和广告ID组成key, 1设置为value val provinceAdToOne: RDD[((String, String), Int)] = lines.map(one => { val arr: Array[String] = one.split(" ") ((arr(1), arr(4)), 1) }) // 相同key的value值进行累加 val provinceAdToSum: RDD[((String, String), Int)] = provinceAdToOne.reduceByKey(_ + _) // 将身份设置为key, 广告ID和原value值组成新的value val provinceToAdSum: RDD[(String, (String, Int))] = provinceAdToSum.map(one => (one._1._1, (one._1._2, one._2))) // 相同key的数据进行分组 val provinceGroup: RDD[(String, Iterable[(String, Int)])] = provinceToAdSum.groupByKey() // 对每条数据的value值进行排序, 并只读取前3条数据 val top3: RDD[(String, List[(String, Int)])] = provinceGroup.mapValues( // 转成列表 one => one.toList.sortWith( // 比较value中的第二个元素 (x, y) => x._2 > y._2) // 读取前三条数据 .take(3)) top3.foreach(println(_)) sc.stop() }}
-
运行结果
声明:
- 本文参考了尚硅谷Spark课程的课件
发表评论
最新留言
留言是一种美德,欢迎回访!
[***.207.175.100]2025年03月21日 00时43分56秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
android:使用audiotrack 类播放wav文件
2019-03-05
聊聊我的五一小假期
2019-03-05
数据库三个级别封锁协议
2019-03-05
ACM/NCPC2016 C Card Hand Sorting(upc 3028)
2019-03-05
ubuntu学习笔记-常用文件、命令以及作用(hosts、vim、ssh)
2019-03-05
SLAM学习笔记-求解视觉SLAM问题
2019-03-05
普歌-允异团队-HashMap面试题
2019-03-05
还在一个一个手动安装虚拟机吗?Cobbler自动部署装机一键最小化安装打把游戏就好了
2019-03-05
程序员应该知道的97件事
2019-03-05
create-react-app路由的实现原理
2019-03-05
Linux环境变量配置错误导致命令不能使用(杂谈)
2019-03-05
openstack安装(九)网络服务的安装--控制节点
2019-03-05
shell编程(六)语言编码规范之(变量)
2019-03-05
vimscript学习笔记(二)预备知识
2019-03-05
Android数据库
2019-03-05
HTML基础,块级元素/行内元素/行内块元素辨析【2分钟掌握】
2019-03-05
STM8 GPIO模式
2019-03-05
omnet++
2019-03-05
23种设计模式一:单例模式
2019-03-05