Spark - Spark Streaming 阶段性总结(2017-05-11)
发布日期:2021-06-30 19:50:41 浏览次数:2 分类:技术文章

本文共 2375 字,大约阅读时间需要 7 分钟。

1. spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable

出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。

解决这个问题最常用的方法有:

  • 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
  • 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
  • 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。将引用的类做成可序列化的。

2. SparkStreaming的Broadcast

object IPRuels {  // 将ip地址转换为整数  def ip2num(ip : String) : Long = {    val fragments = ip.split("\\.")    var ipNum = 0L    for (i <- 0 until fragments.length) {      // 与运算      ipNum = fragments(i).toLong | ipNum << 8L    }    ipNum  }  @volatile  private var instance : Broadcast[Array[(Long, Long, String)]] = null  def getInstance(sc: SparkContext): Broadcast[Array[(Long, Long, String)]] = {    if (instance == null) {      synchronized {        if (instance == null) {          val wordBlacklist = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line =>            // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd          {            val fields = line.trim().split("\t")            val start_num = ip2num(fields(0).trim())            val end_num = ip2num(fields(1).trim())            val province = fields(2).trim()            (start_num, end_num, province)          }).collect()          instance = sc.broadcast(wordBlacklist)        }      }    }    instance  }}
def IPLocation(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL): Unit = {    val ipRulesBroadcast = IPRuels.getInstance(sc)    val result = dStream.map(line => { ipPat.findFirstIn(line.toString()).mkString("")})      .map(ip => {        var info : Any = None        if(!ip.isEmpty) {          val ipNum = ip2num(ip)          val index = binarySearch(ipRulesBroadcast.value, ipNum)          info = ipRulesBroadcast.value(index)        }        (info, 1L)})      .reduceByKey(_+_)    result.foreachRDD((rdd : RDD[(Any, Long)]) => {      // 将total 写在map,for里      var total = 1L      try {         total = rdd.reduce((x, y) => ("t", x._2 + y._2))._2      } catch {        case  ex : UnsupportedOperationException => {          total = 1        }      }      val rowRdd = rdd.map(x => {        val r = x._2.toFloat/total        (x._1, r)      }).map(line => Row(line._1.toString, line._2.toString))      val schemaStr = "loc,rate"      mongoSQL.put(schemaStr, rowRdd, "IP_rate_2")    })  }
参考:

3. 保存3个好博客

转载地址:https://lipenglin.blog.csdn.net/article/details/71617046 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python - 利用Pandas对某app数据进行整理、分析并存入mongodb
下一篇:python - IP Proxies

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年05月02日 18时02分08秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章