
项目实战从0到1之Spark(5)Spark整合Elasticsearch-从ES读取数据
发布日期:2021-05-14 00:16:47
浏览次数:17
分类:博客文章
本文共 768 字,大约阅读时间需要 2 分钟。
由于ES集群在拉取数据时可以提供过滤功能,因此在采用ES集群作为spark运算时的数据来源时,
根据过滤条件在拉取的源头就可以过滤了(ES提供过滤),就不必像从hdfs那样必须全部加载进spark的内存根据filter算子过滤,费时费力。代码:
import org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark._object Spark2Elasticsearch {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("Spark2ES").setMaster("local[2]")conf.set("es.nodes","hadoop1,hadoop2,hadoop3")conf.set("es.port","9200")conf.set("es.index.auto.create","true")val sc =new SparkContext(conf)val query:String =s"""{"query" : {"match_all" : {}},"filter" : {"term" : {"price" : 50.55}}}"""val rdd = sc.esRDD("store", query)println(rdd.collect().toBuffer)}}
运行结果:
采坑点:
那个sc.esRDD方法其实是ES提供的jar包里的一个隐试转换,在import org.elasticsearch.spark._这个包下,配置mavin依赖时注意spark的配套版本,本文1.6的spark依赖如下:
发表评论
最新留言
表示我来过!
[***.240.166.169]2025年04月08日 02时22分16秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Horizon Cloud之UAG访问异常
2019-03-12
vm无法打开电源
2019-03-12