Reputation: 83
I've been experimenting with Apache Spark to see if it can be used to make an analysis engine for data we have store in an Elasticsearch cluster. I've found that with any significant RDD size (i.e. several million records), that even the simplest operations take more than a minute.
For example, I made this simple test program:
package es_spark;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
public class Main {
public static void main (String[] pArgs) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.set("es.nodes", pArgs[0]);
JavaSparkContext sc = new JavaSparkContext(conf);
long start = System.currentTimeMillis();
JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(sc, "test3");
long numES = esRDD.count();
long loadStop = System.currentTimeMillis();
JavaRDD<Integer> dummyRDD = esRDD.map(pair -> {return 1;});
long numDummy = dummyRDD.count();
long mapStop = System.currentTimeMillis();
System.out.println("ES Count: " + numES);
System.out.println("ES Partitions: " + esRDD.getNumPartitions());
System.out.println("Dummy Count: " + numDummy);
System.out.println("Dummy Partitions: " + dummyRDD.getNumPartitions());
System.out.println("Data Load Took: " + (loadStop - start) + "ms");
System.out.println("Dummy Map Took: " + (mapStop - loadStop) + "ms");
sc.stop();
sc.close();
}
}
I've run this on a spark cluster with 3 slaves, each with 14 cores and 49.0GB of RAM. With the following command:
./bin/spark-submit --class es_spark.Main --master spark://<master_ip>:7077 ~/es_spark-0.0.1.jar <elasticsearch_main_ip>
The output is:
ES Count: 8140270
ES Partitions: 80
Dummy Count: 8140270
Dummy Partitions: 80
Data Load Took: 108059ms
Dummy Map Took: 104128ms
It takes 1.5+ minutes to perform the dummy map job on the 8+ million records. I find this performance surprisingly low given that the map job does nothing. Am I doing something wrong or is this about normal performance for Spark?
I've also tried twidding the --executor-memory
and --executor-cores
without much difference.
Upvotes: 0
Views: 1107
Reputation: 35219
find this performance surprisingly low given that the map job does nothing.
Map job doesn't do nothing. It has to fetch complete dataset from Elastic search. Because data is not cached it happens twice, once for each action. This time also includes some initialization time.
Overall you measure:
and some secondary things like:
Upvotes: 1
Reputation: 6154
Generally unless you see OOM failures or significant GC or spilling to disk as a bottleneck, it's not worth changing the executor memory. When you do change it, you should also decrease the spark.memory.fraction. For your job it's highly unlikely to help.
There is a startup cost to Spark that makes it relatively inefficient for smaller data loads. You should be able to optimize your startup to much shorter than a minute, but it's still much more practical for extremely large batch loads, not real-time analytics.
I would recommend you use the DataFrame API instead of RDDs. For your simple example operation above it shouldn't matter, but you're more likely to benefit from performance optimizations as things get more complicated.
e.g. sql.read.format("es").load("test3")
To troubleshoot what's causing the slowness you can take a look at your Spark UI. Were you actually getting parallelism? Did all the jobs execute in roughly the same amount of time? Another possible source of slowness is network issues between your cluster and the ES server.
Upvotes: 0