Reputation: 2719
I've done some experiments in the spark-shell with the elasticsearch-spark connector. Invoking spark:
] $SPARK_HOME/bin/spark-shell --master local[2] --jars ~/spark/jars/elasticsearch-spark-20_2.11-5.1.2.jar
In the scala shell:
scala> import org.elasticsearch.spark._
scala> val es_rdd = sc.esRDD("myindex/mytype",query="myquery")
It works well, the result contains the good records as specified in myquery. The only thing is that I get all the fields, even if I specify a subset of these fields in the query. Example:
myquery = """{"query":..., "fields":["a","b"], "size":10}"""
returns all the fields, not only a and b (BTW, I noticed that size parameter is not taken in account neither : result contains more than 10 records). Maybe it's important to add that fields are nested, a and b are actually doc.a and doc.b.
Is it a bug in the connector or do I have the wrong syntax?
Upvotes: 4
Views: 14555
Reputation: 40380
The spark elasticsearch connector uses fields
thus you cannot apply projection.
If you wish to use fine-grained control over the mapping, you should be using DataFrame
instead which are basically RDDs plus schema.
pushdown
predicate should also be enabled to translate (push-down) Spark SQL into Elasticsearch Query DSL.
Now a semi-full example :
myQuery = """{"query":..., """
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("query", myQuery)
.option("pushdown", "true")
.load("myindex/mytype")
.limit(10) // instead of size
.select("a","b") // instead of fields
Upvotes: 4
Reputation: 22691
You want restrict fields returned from elasticsearch _search HTTP API? (I guess to improve download speed).
First of all, use a HTTP proxy to see what the elastic4hadoop plugin is doing (I use on MacOS Apache Zeppelin with Charles proxy). This will help you to understand how pushdown works.
There are several solutions to achieve this:
1. dataframe and pushdown
You specify fields, and the plugin will "forward" to ES (here the _source parameter):
POST ../events/_search?search_type=scan&scroll=5m&size=50&_source=client&preference=_shards%3A3%3B_local
(-) Not fully working for nested fields.
(+) Simple, straightaway, easy to read
2. RDD & query fields
With JavaEsSpark.esRDD
, you can specify fields inside the JSON query, like you did. This only work with RDD (with DataFrame, the fields is not sent).
(-) no dataframe -> no Spark way
(+) more flexible, more control
Upvotes: 0
Reputation: 723
what about calling:
scala> val es_rdd = sc.esRDD("myindex/mytype",query="myquery", Map[String, String] ("es.read.field.include"->"a,b"))
Upvotes: 2