Reputation: 433
My code's algorithm as below
Step1. get one hbase entity data to hBaseRDD
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
Step2. transform hBaseRDD to rowPairRDD
// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
.mapToPair(***);
dataRDD.repartition(500);
dataRDD.cache();
Step3. transform rowPairRDD to schemaRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema);
schemaRDD.registerTempTable("testentity");
sqlContext.sqlContext().cacheTable("testentity");
Step4. use spark sql do the first simple sql query.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
column3 = 'value1' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Step5. use spark sql do the second simple sql query.
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Step6. use spark sql do the Third simple sql query.
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Test result as below:
Test case1:
When I insert 300,000 records, the hbase entity, then run the code.
If I use hbase Api to do the similar query, it only takes 2000 ms. Apparently the last 2 spark sql query are much quicker than the hbase api query.
I believe the 1st spark sql query spends a lot of time to load data from hbase.
So 1st query much slower than the last 2 querys. I think the result is expected
Test case2:
When I insert 400,000 records. the hbase entity, then run the code.
If I use hbase Api to do the similar query, it only takes 3500 ms. Apparently the 3 spark sql querys are much slower than the hbase api query.
And the last 2 spark sql querys are also very slow and the performance similar to the first query, Why? How can I tune the performance?
Upvotes: 7
Views: 7371
Reputation: 10931
I suspect you are trying to cache more data than you have allocated to your Spark instance. I'll try to break down what is going on in each execution of the exact same query.
First of all, everything in Spark is lazy. This means that when you call rdd.cache()
, nothing actually happens until you do something with the RDD.
First Query
Second/Third Query
Now, Spark will try to cache as much of an RDD as possible. If it can't cache the entire thing, you may run into some serious slow downs. This is especially true if one of the steps before caching causes a shuffle. You may be repeating steps 1 - 3 in the first query for each subsequent query. That's not ideal.
To see if you are not fully caching an RDD, go to your Spark Web UI (http://localhost:4040
if in local standalone mode) and look for the RDD storage/persistence information. Make sure it is at 100%.
Edit (per comments):
400,000 data size in my hbase only about 250MB. Why I need to use 2G to fixed the issue(but 1G>>250MB)
I can't say for certain why you hit your max limit with spark.executor.memory=1G
, but I will add some more relevant information about caching.
spark.storage.memoryFraction=0.6
or 60%. So you are really only getting 1GB * 0.6
.Object
metadata. You can change the default persistence level.Do you know how to cache all the data to avoid the bad performance for the first query?
Invoking any action will cause the RDD to be cached. Just do this
scala> rdd.cache
scala> rdd.count
Now it's cached.
Upvotes: 3
Reputation: 1319
I hope you are running these queries one after another in single go,if yes the why are you creating separate sqlContext for each query? Also can you try repartition the RDD which will increase the parallelism.Also if possible cache the RDD.
Hope the above steps will improve the performance.
Upvotes: 1