项目实战从0到1之Spark(5)Spark整合Elasticsearch-从ES读取数据
发布日期:2021-05-14 00:16:47 浏览次数:17 分类:博客文章

本文共 768 字,大约阅读时间需要 2 分钟。

由于ES集群在拉取数据时可以提供过滤功能,因此在采用ES集群作为spark运算时的数据来源时,

根据过滤条件在拉取的源头就可以过滤了(ES提供过滤),就不必像从hdfs那样必须全部加载进spark的内存根据filter算子过滤,费时费力。

代码:

import org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.spark._object Spark2Elasticsearch {def main(args: Array[String]): Unit = {val conf =new SparkConf().setAppName("Spark2ES").setMaster("local[2]")conf.set("es.nodes","hadoop1,hadoop2,hadoop3")conf.set("es.port","9200")conf.set("es.index.auto.create","true")val sc =new SparkContext(conf)val query:String =s"""{"query" : {"match_all" : {}},"filter" : {"term" : {"price" : 50.55}}}"""val rdd = sc.esRDD("store", query)println(rdd.collect().toBuffer)}}

运行结果:

采坑点:

那个sc.esRDD方法其实是ES提供的jar包里的一个隐试转换,在import org.elasticsearch.spark._这个包下,
配置mavin依赖时注意spark的配套版本,本文1.6的spark依赖如下:

上一篇:项目实战从0到1之Spark(6)Spark 读取mysql中的数据
下一篇:项目实战从0到1之Spark(4)SparkSQL读取HBase数据

发表评论

最新留言

表示我来过!
[***.240.166.169]2025年04月08日 02时22分16秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章