本文共 4884 字,大约阅读时间需要 16 分钟。
介绍
Spark SQL是一个用于结构化数据处理的SPark模块。
Datasets and DataFrames:
DataSet:数据集是数据的分布式集合;它提供了RDDs(强大的类型,能够使用强大的lambda函数)的优点,以及SparkSQL的优化执行引擎的优点
DataFrame是数据集(DataSet)组织成有名字的列;它在概念上相当于关系数据库中的表;只有Scala中才会使用到dataFrame;而在Java中使用的是DataSetAPI操作
前期准备
//告诉系统,hadoop在哪System.setProperty("hadoop.home.dir", "E:/帮助文档/大数据/hadoop-3.2.1"); /* * 返回值是Builder * */ var builder = SparkSession.builder(); /* 设置一些参数 */ builder.appName("MySql"); var conf = new SparkConf(); /* 在本地跑 */ conf.setMaster("local[*]"); /* 传入一个config对象 */ builder.config(conf); /* 单个单个的传 */ //builder.config(key, value) /* 设置Spark自带的默认的仓库路径 */ //builder.config("hive.metastore.warehouse.dir", "e:/test/warehouse"); /* 初始化SparkSession */ var spark = builder.getOrCreate() ; /* 获取sparkContext * 千万不要new一个sparkContext和SparkSession * */ var sc = spark.sparkContext ; /* 完成 */ println("初始化完成"); spark.stop();
主要方法
传入参数为SparkContext、SparkSession
def test01(sc:SparkContext,ss:SparkSession){
加载数据(DataFrame.read.load(path))(默认是parquet格式文件)
在Sparkrepo中的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”找到完整的示例代码,将其拷贝至eclipse项目下。
var path = "./users.parquet" /*加载数据源 * load加载的是文件路径 * 加载和存储的格式默认就是parquet * 返回值是users.parquet,类似于Rdd,但是比他高级 * */ var dataframe = ss.read.load(path)
加载其他数据类型的文件(json)
var path = "./people.json" //读取json的文件 var dataFrame = ss.read.format("json").load(path) //也可以这样 var dataFrame =ss.read.json(path)
加载csv的文件
//两种加载文件的方法//第一种var DataFrameReader = ss.read.format("csv")//第二种var csvDataFrame =ss.read.csv(path) //csv文件默认是以,分隔,但是如果文件是以;分隔,故需要重新设置 DataFrameReader.option("sep", ";")
处理csv文件需要额外操作
假使什么都不去设置,打印出来就是如下
println("csvDataFrame.printSchema()") csvDataFrame.printSchema() |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) println("csvDataFrame.show") csvDataFrame.show() +-----+---+---------+| _c0|_c1| _c2|+-----+---+---------+| name|age| job||Jorge| 30|Developer|| Bob| 32|Developer
所以还需要额外设置
//以上观之,将表头当作数据了,这样不可,需要将表头给拎出来 /* true:自动的推断数据的类型 * false:不自动推荐数据的类型;默认全部是String */ DataFrameReader.option("inferSchema", "true") /* csv文件的第一行是不是数据;true:表示不是数据,是表头; * false:表示的是数据,不是表头 */ DataFrameReader.option("header", "true") var csvDataFrame = DataFrameReader.load(path) println("csvDataFrame.printSchema()") csvDataFrame.printSchema() println("csvDataFrame.show")
打印表的相关内容printSchema、show
println("dataframe" + dataframe) println("printSchema") //打印dataframe的结构 dataframe.printSchema() println("show") //查看前20条数据 dataframe.show()
只检索自己想要的字段DataFrame.select(字段)
var selectdataframe = dataframe.select("favorite_numbers")
进行sql操作
与使用ReadAPI将文件加载到DataFrame并对其进行查询不同,还可以直接使用SQL查询该文件
//第一种操作//注意是``,而非'' var sql = "select * from json.`./people.json`" var SqlDataFrame = ss.sql(sql) //第二种操作 //使用方法进行加载 var path = "./people.json" var JsondataFrame = ss.read.json(path) //第三种操作 //创建一张临时表 //注意是createOrReplaceTempView // 而不是createGlobalTempView JsondataFrame.createOrReplaceTempView("peo") //sql语句 var Jsonsql = "select name from peo where age > 20" var JsonsqlDataFrame = ss.sql(Jsonsql) println("JsonsqlDataFrame.printSchema()") JsonsqlDataFrame.printSchema() println("JsonsqlDataFrame.show") JsonsqlDataFrame.show()
存储数据DataFrame.write.save
//保留数据 var respath = path + "_result" selectdataframe.write.save(respath)
存储为其他类型文件
//文件存储,以另外一种方式存储 var respath = "./people" dataFrame.write.format("parquet").save(respath) //也可这样 dataFrame.write.parquet(respath)
保存方式
保存操作可以选择SaveMode,它指定如何处理现有数据(如果存在)。重要的是要认识到,这些保存模式不使用任何锁定,也不是原子的。此外,在执行Overwrite,在写出新数据之前,数据将被删除。
/* 保存模式 * SaveMode.ErrorIfExists,若文件存在,则报错 * SaveMode.Append 若文件存在,则追加 * SaveMode.Overwrite若文件存在,则覆盖 * SaveMode.Ignore:若文件存在,则忽略 * */ var respath = csvPath + "_res" dataFrame.write.mode(SaveMode.Overwrite).json(respath)
保存到持久化表
DataFrames还可以使用saveAsTable命令。注意,现有的Hive部署并不是使用此特性所必需的。spark将为您创建一个默认的本地Hive转移(使用Derby)。不像createOrReplaceTempView命令,saveAsTable将显现DataFrame的内容,并创建一个指向Hive转移区中数据的指针。即使spark重新启动之后,持久化表仍然存在,只要您保持的连接是同一元数据。可以通过调用table方法的SparkSession的table方法。
对于基于文件的数据源,例如文本、拼图、json等,您可以通过path选项,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”)…当表被删除时,自定义表路径将不会被移除,并且表数据仍然存在。如果未指定自定义表路径,SPark将数据写入仓库目录下的默认表路径。当表被删除时,默认的表路径也将被删除。
从Spark2.1开始,持久性数据源表的每个分区元数据都存储在Hive的元数据中。这带来了几个好处:
由于元数据只能返回查询所需的分区,因此不再需要在第一个查询中发现表中的所有分区。
单元DDL,如ALTER TABLE PARTITION … SET LOCATION现在可用于使用Datasource API创建的表。 注意,在创建外部数据源表(那些具有path备选方案)。要同步元数据中的分区信息,可以调用MSCK REPAIR TABLE.//保存到hive //dataFrame.write.format("json").saveAsTable("temp") 上面指令先执行,将数据保存在spark-warehouse/temp下
加载数据,就要read(读),然后load(加载),数据被处理为dataframe 保存数据:就要write(写),然后save(保存),生成的是文件夹
转载地址:https://blog.csdn.net/qq_45292079/article/details/103845694 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!