ixaxaar
ixaxaar

Reputation: 6769

Elasticsearch spark reading slow

Reading from Elasticsearch v6.2 into spark using the prescribed spark connector org.elasticsearch:elasticsearch-spark-20_2.11:6.3.2 is horrendously slow. This is from a 3 node ES cluster with index:

curl https://server/_cat/indices?v
green  open   db MmVwAwYfTz4eE_L-tncbwQ   5   1  199983131      9974871    105.1gb         51.8gb

Reading on a (10 node, 1tb mem, >50 VCPUs) spark cluster:

val query = """{
  "query": {
    "match_all": {}
  }
}"""

val df = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes","server")
  .option("es.port", "443")
  .option("es.net.ssl","true")
  .option("es.nodes.wan.only","true")
  .option("es.input.use.sliced.partitions", "false")
  .option("es.scroll.size", "1000")
  .option("es.read.field.include", "f1,f2,f3")
  .option("es.query",query)
  .load("db")

df.take(1)

That took 10 minutes to execute. enter image description here

Is this how (slowly) it's supposed to work, or am I doing something wrong?

Upvotes: 2

Views: 945

Answers (1)

bechir nahali
bechir nahali

Reputation: 11

This is not how slow it is supposed to be, and the answer could be found in the screenshot you shared:

The column Stages: Succeeded/Total in Spark UI shows only one task that runs the read operation, I don't think that this is what you would expect, otherwise, what's the point of having a whole cluster.

I have faced the same problem and it took me a while to figure out that Spark associates a task (partition) to each shard in the Elasticsearch index,

There we have our answer, to go faster we should parallelise the process, how to do so ? well by distributing our source index into multiple shards.

By default, Elasticsearch creates an Index with one shard, though, it is possible to personalised it as below:

PUT /index-name
{
     "settings": {
     "index": {
     "number_of_shards": x,  
     "number_of_replicas": xx 
    }
  }
}

The number of shards could be higher than the number of Elastic nodes, this is all transparent to Spark. If the index already exists, try creating a new inex and then use the Elasticsearch Reindex API

I hope this solved your problem.

Upvotes: 1

Related Questions