LaserJesus
LaserJesus

Reputation: 8550

How do I run a query against Elasticsearch using PySpark without querying every node?

My end goal is to use PySpark to efficiently index a large volume of data in Elasticsearch (ES), then run a huge number of queries against the index and record statistics over the results.

Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0

Consider the following code:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

# create our Spark Context  
sc_conf = SparkConf().setAll((
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")

sc = SparkContext(conf=sc_conf)

sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_live_conf["es.query"] = q

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_live_conf)

sqlContext.createDataFrame(es_rdd).limit(1).collect()

I'm just trying to run a match all query against the index and I only want the top result. I tried expressing the limit in the ES query but apparently Spark ignores this so I've expressed it with a data frame filter instead.

I've configured Spark as follows:

es_live_conf = {

# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',

# specify the port in case it is not the default port
"es.port" : ES_PORT,

# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',

"es.net.http.auth.user" : ES_USERNAME,

"es.net.http.auth.pass" : ES_PASSWORD,

"es.net.ssl":"true",

"es.net.ssl.cert.allow.self.signed": "true",

"es.nodes.discovery": "false",

"es.nodes.wan.only": "true",

"es.index.read.missing.as.empty": "true",

}

I'm accessing the ES cluster behind a VPC so I only have access to the client nodes and none of the internal data etc nodes. This is why wan.only is set to true.

With this setup Spark seems to query every single node with a full match-all, then eventually consolidates down to the single result that I actually want. It's incredibly slow (50 shards, 30 million documents) and It completely avoid ES's ability to efficiently reduce the results from each node itself. Even if I change the query to specifically search by a single document ID it runs the query against each individual shard, via the master node, by specifying a particular shard ID on each call. I tried setting the es.nodes.client.only to true, but this complains that the setting conflicts with wan.only. If I enable client.only and disable wan.only I can't connect to the cluster anymore because it tries to directly connect with each node, which aren't accessible.

What am I doing wrong here? How do I use PySpark to run a query against ES once, rather that once for every shard. Further, how do I make use of things like from, size and rescore in my queries if PySpark tries to run the full query on every shard anyway and then seemingly post processes the results?

Upvotes: 0

Views: 1143

Answers (1)

LaserJesus
LaserJesus

Reputation: 8550

I couldn't find a way to solve this using the ES Hadoop library. It seems that it's more suited to using Spark when you need to run a very long, very complex reduce step across the results returned from a single Elasticsearch query, rather than running millions of fast ES queries and aggregating the results. To solve this I ended up using this plugin: https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest

I actually developed it further so that each core could use multiple threads to execute even more requests in parallel. Not only does it essentially let you DDOS your ES cluster, but any restful endpoint for any platform that you might need to consume and aggregate a large volume of requests over.

If I can get the all clear I'll publish the multi-threaded version I created publicly on github too.

Upvotes: 0

Related Questions