SparkRdd-scala版本
发布日期:2021-07-27 04:52:43 浏览次数:5 分类:技术文章

本文共 3591 字,大约阅读时间需要 11 分钟。

需知

介绍一下什么是rdd;rdd就是一个容器;List,set,map也是容器,区别:list,set,map是单机的,把数据放到内存中,内存有上限;rdd:里面存储的数据无上限;(因为它是集群,把所有的电脑的内存都连接起来)

容器里面的方法(CRUD);
Sql语句:也是操作容器里面的数据,而且功能特别强大;
所有的功能都集于rdd一身;
方法(算子);stream编程;

为什么要学习rdd

List,set,map是单机的,把数据放到内存中,内存有上限;

Mr:但是它比较慢;

Rdd

Rdd:无上限的容器,分布式容器

Rdd:(弹性分布式数据集)–>resilient distributed dataset
它是可以并行(多线程)操作的元素的容错(有错误可以修复)集合(容器)

五大特性:

A list of partitions:(分区列表)

A function for computing each split(计算每个分割的函数)
A list of dependencies on other RDDs;(对其他RDD的依赖关系列表)
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)(可选地,key - value RDD的分区器(例如,说RDD是散列分区的))
Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file)(可选地,要计算每个拆分的首选位置列表(例如一个HDFS文件)

Rdd操作

操作分为两类

Transformations(传输,转换)算子;返回值还是rdd(Stream)
Action(行为,行动);算子;;返回值还是非rdd(非Stream)
Transformations算子是懒惰的,必须得有一个action算子才能执行(数据流里面action算子只能调用一次,在spark里面可以调用多次)
Spark大哥和小弟之间传输的是代码,而不是数据(因为数据太大)
闭包不建议对闭包外面的变量进行修改(可以使用)

API操作

前期准备

/* java连接Hbase */      System.setProperty("hadoop.home.dir", "E:/帮助文档/大数据/hadoop-3.2.1");      /* Spark的程序;我们这是在Spark的基础之上进行二次开发 */      /* === SparkContext === */      /* 创建SparkConf       * bin/spark-submit 后面可以跟上一些参数       *  */      var conf = new SparkConf();      /* 设置一个大哥 */      conf.setMaster("local[*]");      conf.setAppName("MyHw");      /* 参数是SparkConf */      var sc = new SparkContext(conf);      /* 调用方法 */      //makeRdd1(sc);            /* 根据外部系统的文件集创建rdd */      makeRdd2(sc);            /* 一个jvm里面只能一个sc;当运行完以后,要调用停止的方法 */      sc.stop()
var arr = Array(10,20,15,30,50,60);    /* 创建一个rdd     * 参数1:Seq:List,Set,Map,数组     *  */    var arrRdd = sc.parallelize(arr);    /* 查看这个rdd里面的元素 */    var collectArr = arrRdd.collect() ;     println("collectArr:" + Arrays.toString(collectArr));    /* 获取元素 */    arrRdd.foreach( t => println("===foreach循环==" + t ));    /* 简化 */    arrRdd.foreach( println(_) );    println("===分区===");    /* 创建rdd的时候可以有多个分区(一个分区一个任务)     * 参数2:是分区     *  */    var arrParaRdd = sc.parallelize(arr, 2);    /* 查看元素 */    collectArr = arrParaRdd.collect() ;      println("collectArr:" + Arrays.toString(collectArr));    /* 获取元素 */    arrParaRdd.foreach( t => println("===foreach循环==" + t ));    /* 要想获取这个集合中的数据如何分区 */    arrParaRdd.foreachPartition( t =>    {      /* 查看类型 */      println("==foreachPartition==" + t.getClass.getSimpleName );      t.foreach( t1 =>       {        print("===循环==>" + t1 + "\t");       });      println("==结束===");    });     /* 查看分区数 */    println("==getNumPartitions===" + arrParaRdd.getNumPartitions) ;   }    /**   * 创建rdd   */  def makeRdd2(sc:SparkContext) : Unit =  {    println("==makeRdd2===");    /* 文件路径:绝对路径     * 默认是本地路径:file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data.txt     * 		如果是集群,保证这个文件在所有的集群上都要有一份;     * 		这里面的路径支持*通配符     * SequenceFiles:这个是hadoop,hdfs支持的文件格式     * hdfs:支持上hdfs的路径     * 保存     *  */    var path = "file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data.txt" ;     /* 参数1:文件的路径     * 参数2:最小的分区数     * 返回值是一个rdd:     * 		文件第一行当做rdd里面的元素     *  */    var textFileRdd = sc.textFile(path,3);    println("==textFileRdd==" + textFileRdd);    println("==getNumPartitions===" + textFileRdd.getNumPartitions) ;    /* 收集元素 */    var collectArr = textFileRdd.collect() ;     println("collectArr:" + collectArr);    /* 获取元素 */    textFileRdd.foreach( t => println("===foreach循环==" + t ));    /* 这个路径是一个目录,而不是文件夹 */    var tarPath = "file:/E:/班级/大数据_20191211/20191230_spark_rdd/02_代码/01-rdd/src/data_res.txt"		/* 将rdd存储到磁盘上,存储的是文本文件 */    textFileRdd.saveAsTextFile(tarPath);  }}

转载地址:https://blog.csdn.net/qq_45292079/article/details/103846491 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:spark-Json、jdbc操作、hive本地版
下一篇:SparkSql

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年09月29日 22时09分12秒