Patrick
Patrick

Reputation: 2719

Reading ES from spark with elasticsearch-spark connector: all the fields are returned

I've done some experiments in the spark-shell with the elasticsearch-spark connector. Invoking spark:

] $SPARK_HOME/bin/spark-shell --master local[2] --jars ~/spark/jars/elasticsearch-spark-20_2.11-5.1.2.jar

In the scala shell:

scala> import org.elasticsearch.spark._
scala> val es_rdd = sc.esRDD("myindex/mytype",query="myquery")

It works well, the result contains the good records as specified in myquery. The only thing is that I get all the fields, even if I specify a subset of these fields in the query. Example:

myquery = """{"query":..., "fields":["a","b"], "size":10}"""

returns all the fields, not only a and b (BTW, I noticed that size parameter is not taken in account neither : result contains more than 10 records). Maybe it's important to add that fields are nested, a and b are actually doc.a and doc.b.

Is it a bug in the connector or do I have the wrong syntax?

Upvotes: 4

Views: 14555

Answers (3)

eliasah
eliasah

Reputation: 40380

The spark elasticsearch connector uses fields thus you cannot apply projection.

If you wish to use fine-grained control over the mapping, you should be using DataFrame instead which are basically RDDs plus schema.

pushdown predicate should also be enabled to translate (push-down) Spark SQL into Elasticsearch Query DSL.

Now a semi-full example :

myQuery = """{"query":..., """
val df = spark.read.format("org.elasticsearch.spark.sql")
                     .option("query", myQuery)
                     .option("pushdown", "true")
                     .load("myindex/mytype")
                     .limit(10) // instead of size
                     .select("a","b") // instead of fields

Upvotes: 4

Thomas Decaux
Thomas Decaux

Reputation: 22691

You want restrict fields returned from elasticsearch _search HTTP API? (I guess to improve download speed).

First of all, use a HTTP proxy to see what the elastic4hadoop plugin is doing (I use on MacOS Apache Zeppelin with Charles proxy). This will help you to understand how pushdown works.

There are several solutions to achieve this:

1. dataframe and pushdown

You specify fields, and the plugin will "forward" to ES (here the _source parameter):

POST ../events/_search?search_type=scan&scroll=5m&size=50&_source=client&preference=_shards%3A3%3B_local

(-) Not fully working for nested fields.

(+) Simple, straightaway, easy to read

2. RDD & query fields

With JavaEsSpark.esRDD, you can specify fields inside the JSON query, like you did. This only work with RDD (with DataFrame, the fields is not sent).

(-) no dataframe -> no Spark way

(+) more flexible, more control

Upvotes: 0

Piotr Idzikowski
Piotr Idzikowski

Reputation: 723

what about calling:

scala> val es_rdd = sc.esRDD("myindex/mytype",query="myquery", Map[String, String] ("es.read.field.include"->"a,b"))

Upvotes: 2

Related Questions