大数据学习之Spark——05RDD算子
发布日期:2021-05-06 17:19:31 浏览次数:29 分类:精选文章

本文共 4457 字,大约阅读时间需要 14 分钟。

RDD整体上分为Value类型和Key-Value类型

一. Transformation算子

Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

1. Value类型

1. map(func)
  1. 作用:
    返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
  2. 案例:
    将原RDD中的元素都乘以2
    在这里插入图片描述
2. flatMap(func)
  1. 作用:
    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
  2. 案例:
    在这里插入图片描述
3. groupBy(func)
  1. 作用:
    分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
  2. 案例:
    将原RDD中的元素按照奇偶进行分组
    在这里插入图片描述
4. fliter(func)
  1. 作用:
    过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
  2. 案例: 过滤掉原RDD中的所有奇数
    在这里插入图片描述
5. sample(withReplacement, fraction, seed)
  1. 作用
    随机抽样
    1. withReplacement: 表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
    2. fraction: 表示抽取的比例(约等于)
    3. seed: 用于指定随机数生成器种子。
  2. 案例
    在这里插入图片描述
6. distinct([numTasks]))
  1. 作用
    对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
  2. 案例: 去重
    在这里插入图片描述
7. coalesce(numPartitions)
  1. 作用:

    减少分区数, 用于大数据集过滤后,提高小数据集的执行效率。

  2. 案例:

    在这里插入图片描述

8. repartition(numPartitions)
  1. 作用
    根据分区数,重新通过网络随机洗牌所有数据。
  2. 案例
    在这里插入图片描述
9. sortBy(func,[ascending], [numTasks])
  1. 作用

    使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

  2. 案例

    在这里插入图片描述

2. 双Value类型交互

1. union(otherDataset)
  1. 作用
    对源RDD和参数RDD求并集后返回一个新的RDD,
    只是合并, 并不会去重
  2. 案例
    在这里插入图片描述
  3. 实现去重(union + distinct)
    在这里插入图片描述
2. subtract (otherDataset)
  1. 作用:
    计算差集的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
  2. 案例
    在这里插入图片描述
3. intersection(otherDataset)
  1. 作用
    对源RDD和参数RDD求交集后返回一个新的RDD
  2. 案例
    在这里插入图片描述
4. cartesian(otherDataset)
  1. 作用:
    笛卡尔积
  2. 案例
    在这里插入图片描述
5. zip(otherDataset)
  1. 作用:

    将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

  2. 案例:在这里插入图片描述

3. Key - Value 类型

1. partitionBy
  1. 作用:
    对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
  2. 案例:
    在这里插入图片描述
2. groupByKey()
  1. 作用

    groupByKey也是对每个key进行操作,但只生成一个sequence

  2. 案例

    在这里插入图片描述

  3. 代码:

    val words: RDD[String] = sc.parallelize(Array("one", "two", "two", "three", "three", "three"))val mapByRDD: RDD[(String, Int)] = words.map((_, 1))val groupByDRR: RDD[(String, Iterable[Int])] = mapByRDD.groupByKey()groupByDRR.foreach(println(_))val result: RDD[(String, Int)] = groupByDRR.map(one => (one._1, one._2.sum))result.foreach(println(_))

    在这里插入图片描述

3. reduceByKey(func, [numTasks])
  1. 作用:

    在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

  2. 案例:

    在这里插入图片描述

reduceByKey和groupByKey的区别

  1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
  2. groupByKey:按照key进行分组,直接进行shuffle。
4. sortByKey([ascending], [numTasks])
  1. 作用
    在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
  2. 案例
    在这里插入图片描述
5. mapValues
  1. 作用:
    针对于(K,V)形式的类型只对V进行操作
  2. 案例:
    在这里插入图片描述
6. join(otherDataset, [numTasks])
  1. 作用

    在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

  2. 案例

    在这里插入图片描述

7. cogroup(otherDataset, [numTasks])
  1. 作用:

    在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    将key相同的数据聚合到一个迭代器

  2. 案例:

    在这里插入图片描述

二. Action算子

Action类算子也是一类算子(函数)叫做行动算子。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

1. reduce(func)
  1. 作用:
    通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
  2. 案例:
    在这里插入图片描述
2. collect()
  1. 作用:
    在驱动程序中,以数组的形式返回数据集的所有元素。
  2. 案例
    在这里插入图片描述
3. count()
  1. 作用:
    返回RDD中元素的个数
  2. 案例
    在这里插入图片描述
4. take(n)
  1. 作用:

    返回一个由RDD的前n个元素组成的数组

  2. 案例:

    在这里插入图片描述

5. first()
  1. 作用:
    返回RDD中的第一个元素
    等价于: take(1)
  2. 案例:
    在这里插入图片描述
6. takeOrdered(n)
  1. 作用:
    返回该RDD排序后的前n个元素组成的数组
  2. 案例:
    在这里插入图片描述
7. countByKey()
  1. 作用:
    针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
  2. 案例
    在这里插入图片描述
8. foreach(func)
  1. 作用:
    在数据集的每一个元素上,运行函数func进行更新
  2. 案例
    在这里插入图片描述

三. 综合运用

  1. 题目

    统计出每一个省份点击次数为前三的广告

  2. 数据格式:

    时间戳,省份,城市,用户,广告,中间字段使用空格分割。

    时间戳	省份ID	城市ID	用户ID	广告ID1516609143867 6 7 64 161516609143869 9 4 75 181516609143869 1 7 87 12
  3. 代码

    package com.hjf.coreimport org.apache.spark.rdd.RDDimport org.apache.spark.{     SparkConf, SparkContext}/** * @author Jiang锋时刻 * @create 2020-07-22 17:02 */object TestAgent {       def main(args: Array[String]): Unit = {         val conf: SparkConf = new SparkConf()    conf.setMaster("local").setAppName("Agent")    val sc: SparkContext = new SparkContext(conf)    sc.setLogLevel("Error")    // 读取数据源    val lines: RDD[String] = sc.textFile("./data/agent.txt")    // 将省份和广告ID组成key, 1设置为value    val provinceAdToOne: RDD[((String, String), Int)] = lines.map(one => {           val arr: Array[String] = one.split(" ")      ((arr(1), arr(4)), 1)    })    // 相同key的value值进行累加    val provinceAdToSum: RDD[((String, String), Int)] = provinceAdToOne.reduceByKey(_ + _)    // 将身份设置为key, 广告ID和原value值组成新的value    val provinceToAdSum: RDD[(String, (String, Int))] = provinceAdToSum.map(one => (one._1._1, (one._1._2, one._2)))    // 相同key的数据进行分组    val provinceGroup: RDD[(String, Iterable[(String, Int)])] = provinceToAdSum.groupByKey()    // 对每条数据的value值进行排序, 并只读取前3条数据    val top3: RDD[(String, List[(String, Int)])] = provinceGroup.mapValues(      // 转成列表      one => one.toList.sortWith(        // 比较value中的第二个元素        (x, y) => x._2 > y._2)        // 读取前三条数据        .take(3))    top3.foreach(println(_))    sc.stop()  }}
  4. 运行结果

    在这里插入图片描述

声明:

  1. 本文参考了尚硅谷Spark课程的课件
上一篇:大数据学习之Spark——06RDD持久化(控制算子)
下一篇:大数据学习之Spark——04RDD概述及创建

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2025年03月21日 00时43分56秒