Reputation: 6769
Reading from Elasticsearch v6.2
into spark using the prescribed spark connector org.elasticsearch:elasticsearch-spark-20_2.11:6.3.2
is horrendously slow. This is from a 3 node ES cluster with index:
curl https://server/_cat/indices?v
green open db MmVwAwYfTz4eE_L-tncbwQ 5 1 199983131 9974871 105.1gb 51.8gb
Reading on a (10 node, 1tb mem, >50 VCPUs) spark cluster:
val query = """{
"query": {
"match_all": {}
}
}"""
val df = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes","server")
.option("es.port", "443")
.option("es.net.ssl","true")
.option("es.nodes.wan.only","true")
.option("es.input.use.sliced.partitions", "false")
.option("es.scroll.size", "1000")
.option("es.read.field.include", "f1,f2,f3")
.option("es.query",query)
.load("db")
df.take(1)
That took 10 minutes to execute.
Is this how (slowly) it's supposed to work, or am I doing something wrong?
Upvotes: 2
Views: 945
Reputation: 11
This is not how slow it is supposed to be, and the answer could be found in the screenshot you shared:
The column Stages: Succeeded/Total
in Spark UI shows only one task that runs the read operation, I don't think that this is what you would expect, otherwise, what's the point of having a whole cluster.
I have faced the same problem and it took me a while to figure out that Spark associates a task (partition) to each shard in the Elasticsearch index,
There we have our answer, to go faster we should parallelise the process, how to do so ? well by distributing our source index into multiple shards.
By default, Elasticsearch creates an Index with one shard, though, it is possible to personalised it as below:
PUT /index-name
{
"settings": {
"index": {
"number_of_shards": x,
"number_of_replicas": xx
}
}
}
The number of shards could be higher than the number of Elastic nodes, this is all transparent to Spark. If the index already exists, try creating a new inex and then use the Elasticsearch Reindex API
I hope this solved your problem.
Upvotes: 1