Spark - Spark Streaming 阶段性总结(2017-05-11)
参考:
发布日期:2021-06-30 19:50:41
浏览次数:2
分类:技术文章
本文共 2375 字,大约阅读时间需要 7 分钟。
1. spark出现task不能序列化错误的解决方法 org.apache.spark.SparkException: Task not serializable
出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。
解决这个问题最常用的方法有:
- 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
- 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
- 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。将引用的类做成可序列化的。
2. SparkStreaming的Broadcast
object IPRuels { // 将ip地址转换为整数 def ip2num(ip : String) : Long = { val fragments = ip.split("\\.") var ipNum = 0L for (i <- 0 until fragments.length) { // 与运算 ipNum = fragments(i).toLong | ipNum << 8L } ipNum } @volatile private var instance : Broadcast[Array[(Long, Long, String)]] = null def getInstance(sc: SparkContext): Broadcast[Array[(Long, Long, String)]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line => // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd { val fields = line.trim().split("\t") val start_num = ip2num(fields(0).trim()) val end_num = ip2num(fields(1).trim()) val province = fields(2).trim() (start_num, end_num, province) }).collect() instance = sc.broadcast(wordBlacklist) } } } instance }}
def IPLocation(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL): Unit = { val ipRulesBroadcast = IPRuels.getInstance(sc) val result = dStream.map(line => { ipPat.findFirstIn(line.toString()).mkString("")}) .map(ip => { var info : Any = None if(!ip.isEmpty) { val ipNum = ip2num(ip) val index = binarySearch(ipRulesBroadcast.value, ipNum) info = ipRulesBroadcast.value(index) } (info, 1L)}) .reduceByKey(_+_) result.foreachRDD((rdd : RDD[(Any, Long)]) => { // 将total 写在map,for里 var total = 1L try { total = rdd.reduce((x, y) => ("t", x._2 + y._2))._2 } catch { case ex : UnsupportedOperationException => { total = 1 } } val rowRdd = rdd.map(x => { val r = x._2.toFloat/total (x._1, r) }).map(line => Row(line._1.toString, line._2.toString)) val schemaStr = "loc,rate" mongoSQL.put(schemaStr, rowRdd, "IP_rate_2") }) }
3. 保存3个好博客
转载地址:https://lipenglin.blog.csdn.net/article/details/71617046 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
表示我来过!
[***.240.166.169]2024年05月02日 18时02分08秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
flink 1-个人理解
2019-04-30
redis cli
2019-04-30
redis api
2019-04-30
flink physical partition
2019-04-30
java 解析json
2019-04-30
java http请求
2019-04-30
tensorflow 数据格式
2019-04-30
tf rnn layer
2019-04-30
tf input layer
2019-04-30
tf model create
2019-04-30
tf dense layer两种创建方式的对比和numpy实现
2019-04-30
tf initializer
2019-04-30
tf 从RNN到BERT
2019-04-30
tf keras SimpleRNN源码解析
2019-04-30
tf keras Dense源码解析
2019-04-30
tf rnn输入输出的维度和权重的维度
2019-04-30
检验是否服从同一分布
2019-04-30
tf callbacks
2019-04-30
keras、tf、numpy实现logloss对比
2019-04-30
Ubuntu20.04安装微信
2019-04-30