Spark - 利用 Spark SQL + MongoDB 对PandaTV主播进行等级分类
发布日期:2021-06-30 19:50:40 浏览次数:2 分类:技术文章

本文共 4481 字,大约阅读时间需要 14 分钟。

Spark SQL

使用Spark SQL时,最主要的两个组件就是DataFrame和SQLContext。

1. DataFrame

DataFrame是一个分布式的,按照命名列的形式组织的数据集合。与关系型数据库中的数据库表类似。通过调用将DataFrame的内容作为行RDD(RDD of Rows)返回的rdd方法,可以将DataFrame转换成RDD。
可以通过如下数据源创建DataFrame:
  • 已有的RDD
  • 结构化数据文件
  • JSON数据集
  • Hive表
  • 外部数据库

2. SQL Context

Spark SQL提供SQLContext封装Spark中的所有关系型功能。
此外,Spark SQL中的HiveContext可以提供SQLContext所提供功能的超集。可以在用HiveQL解析器编写查询语句以及从Hive表中读取数据时使用。在Spark程序中使用HiveContext无需既有的Hive环境

Spark + Mongo的应用场景

  • 对那些已经使用MongoDB的用户,如果你希望在你的MongoDB驱动的应用上提供个性化功能,比如说像Yahoo一样为你找感兴趣的新闻,能够在你的MongoDB数据上利用到Spark强大的机器学习或者流处理,你就可以考虑在MongoDB集群上部署Spark来实现这些功能。
  • 如果你已经使用Hadoop而且数据已经在HDFS里面,你可以考虑使用Spark来实现更加实时更加快速的分析型需求,并且如果你的分析结果有数据量大、格式多变以及这些结果数据要及时提供给前台APP使用的需求,那么MongoDB可以用来作为你分析结果的一个存储方案。
原文:

RDD、DataFrame和DataSet的区别

RDD和DataFrame

RDD和DataSet

DataFrame和DataSet

Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:
原文:

练习:根据熊猫TV房间订阅人数对主播进行分类

  • 原始数据为熊猫tv英雄联盟类别直播间信息,共[7257条],部分如下
/* 1 */{    "_id" : ObjectId("58e77b94a9231916402a4d8b"),    "r_id" : "511386",    "r_name" : "柯柯:6K把德莱文电一王者局",    "r_classification" : {        "cname" : "英雄联盟",        "ename" : "lol"    },    "u_name" : "SteinsGate1",    "u_avatar_url" : "http://i7.pdim.gs/1a1399db3dd5dde23d673204c34d3e89.jpeg",    "time" : "2017-04-07 19:44:20",    "u_follower" : 4896}/* 2 */{    "_id" : ObjectId("58e77b94a9231916402a4d8c"),    "r_id" : "419546",    "r_name" : "大根:国服第一龙王!",    "r_classification" : {        "cname" : "英雄联盟",        "ename" : "lol"    },    "u_name" : "龙大大大大大根",    "u_avatar_url" : "http://i6.pdim.gs/4c0a0e43d7956e9dd8aa8c07df6acb7b.png",    "time" : "2017-04-07 19:44:20",    "u_follower" : 91682}/* 3 */{    "_id" : ObjectId("58e77b94a9231916402a4d8d"),    "r_id" : "7000",    "r_name" : "LPL春季赛Snake vs RNG",    "r_classification" : {        "cname" : "英雄联盟",        "ename" : "lol"    },    "u_name" : "LPL熊猫官方直播",    "u_avatar_url" : "http://i5.pdim.gs/4558e050a485b215de8474e3ab64d04e.png",    "time" : "2017-04-07 19:44:20",    "u_follower" : 846436}
  • 从mongodb中读取原始数据之后,根据"u_follower"对主播进行分类,类别如下
  1. 100 <-> 1000 
  2. 1000 <- >1w
  3. 1w <-> 10w
  4. 10w <-> 100w
  5. 100w <-> 1000w
  • 实现如下
import com.mongodb.spark.MongoSparkimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.sql.types.{StringType, StructField, StructType}/**  * Created by peerslee on 17-4-20.  */object MongoSQL {  def main(args: Array[String]): Unit = {    // 1. 创建SparkContext -> spark 入口    val conf = new SparkConf()      .setMaster("local[*]")      .setAppName("MongoSQL")      .set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/lol.panda")      .set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/lol")    val sc = new SparkContext(conf)    // 2. 创建SQLContext    val sqlContext = SQLContext.getOrCreate(sc)    // 3. 使用MongoSpark创建Dataframe(其他方式创建见文档)    val df = MongoSpark.load(sqlContext)//    df.show()    // 4. 使用filter根据"u_follower"对直播间进行分类    var border = 100    /*    [rdd => DataFrame]    1. 推断Schema    原理: Spark SQL能够将含Row对象的RDD转换成DataFrame,并推断数据类型     */    val schemaStr = "id,follower,rank"    val schema = StructType(schemaStr.split(",")      .map(column => StructField(column, StringType, true)))    for(i <- 1 to 5) {//      val rowRdd = df.filter(df("u_follower") > border && df("u_follower") < border*10).show()      /*      2. 指定Schema      2.1 从原来的RDD创建一个元祖或列表的RDD      2.2 用StructType 创建一个和步骤一中创建的RDD中元组或列表的结构相匹配的Schema      2.3 通过SQLContext提供的createDataFrame方法将schema 应用到RDD上       */      val rowRdd = df.filter(df("u_follower") > border && df("u_follower") < border*10)        .map(room => Row(room(2), room(6).toString, i.toString))        .sortBy(room => room(1).toString.toLong, false)      val rankDF = sqlContext.createDataFrame(rowRdd, schema)      MongoSpark.save(rankDF.write.option("collection", "panda_rank").mode("append"))      border *= 10    }  }}
  • rank 库共[1771]条数据,部分如下
/* 1 */{    "_id" : ObjectId("58f84fbfa830984cc36ef9e0"),    "id" : "6666",    "follower" : "3998068",    "rank" : "5"}
/* 1 */{    "_id" : ObjectId("58f84fbea830984cc36ef9b8"),    "id" : "16666",    "follower" : "974142",    "rank" : "4"}
/* 1 */{    "_id" : ObjectId("58f84fbea830984cc36ef915"),    "id" : "246868",    "follower" : "99866",    "rank" : "3"}
/* 1 */{    "_id" : ObjectId("58f84fbea830984cc36ef558"),    "id" : "357838",    "follower" : "9980",    "rank" : "2"}
/* 1 */{    "_id" : ObjectId("58f84fbda830984cc36ef2fb"),    "id" : "633473",    "follower" : "999",    "rank" : "1"}
  • 等级5的所有主播
参考文档:

转载地址:https://lipenglin.blog.csdn.net/article/details/70255939 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python - IP Proxies
下一篇:Subtext3 - 安装配置

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2024年04月21日 22时22分40秒