simafengyun
simafengyun

Reputation: 433

Spark SQL performance

My code's algorithm as below
Step1. get one hbase entity data to hBaseRDD

      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = 
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class); 

Step2. transform hBaseRDD to rowPairRDD

     // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data 
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD 
                            .mapToPair(***); 
    dataRDD.repartition(500);
        dataRDD.cache();

Step3. transform rowPairRDD to schemaRDD

            JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema); 
            schemaRDD.registerTempTable("testentity"); 
           sqlContext.sqlContext().cacheTable("testentity");

Step4. use spark sql do the first simple sql query.

   JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE 
             column3 = 'value1' ") 
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Step5. use spark sql do the second simple sql query.

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity 
                                     WHERE column3 = 'value2' ") 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Step6. use spark sql do the Third simple sql query.

JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Test result as below:

Test case1:

When I insert 300,000 records, the hbase entity, then run the code.

If I use hbase Api to do the similar query, it only takes 2000 ms. Apparently the last 2 spark sql query are much quicker than the hbase api query.
I believe the 1st spark sql query spends a lot of time to load data from hbase.
So 1st query much slower than the last 2 querys. I think the result is expected

Test case2:

When I insert 400,000 records. the hbase entity, then run the code.

If I use hbase Api to do the similar query, it only takes 3500 ms. Apparently the 3 spark sql querys are much slower than the hbase api query.
And the last 2 spark sql querys are also very slow and the performance similar to the first query, Why? How can I tune the performance?

Upvotes: 7

Views: 7371

Answers (2)

Mike Park
Mike Park

Reputation: 10931

I suspect you are trying to cache more data than you have allocated to your Spark instance. I'll try to break down what is going on in each execution of the exact same query.

First of all, everything in Spark is lazy. This means that when you call rdd.cache(), nothing actually happens until you do something with the RDD.

First Query

  1. Full HBase scan (slow)
  2. Increase number of partitions (causes shuffle, slow)
  3. Data is actually cached to memory because Spark is lazy (kind of slow)
  4. Apply where predicate (fast)
  5. Results are collected

Second/Third Query

  1. Full in-memory scan (fast)
  2. Apply where predicate (fast)
  3. Results are collected

Now, Spark will try to cache as much of an RDD as possible. If it can't cache the entire thing, you may run into some serious slow downs. This is especially true if one of the steps before caching causes a shuffle. You may be repeating steps 1 - 3 in the first query for each subsequent query. That's not ideal.

To see if you are not fully caching an RDD, go to your Spark Web UI (http://localhost:4040 if in local standalone mode) and look for the RDD storage/persistence information. Make sure it is at 100%.

Edit (per comments):

400,000 data size in my hbase only about 250MB. Why I need to use 2G to fixed the issue(but 1G>>250MB)

I can't say for certain why you hit your max limit with spark.executor.memory=1G, but I will add some more relevant information about caching.

  • Spark only allocates a percentage of the executor's heap memory to caching. By default, this is spark.storage.memoryFraction=0.6 or 60%. So you are really only getting 1GB * 0.6.
  • The total space used in HBase likely differs from the total heap space taken when caching in Spark. By default, Spark does not serialize the Java objects when storing in memory. Because of this, there is a decent amount of overhead in storing the Java Object metadata. You can change the default persistence level.

Do you know how to cache all the data to avoid the bad performance for the first query?

Invoking any action will cause the RDD to be cached. Just do this

scala> rdd.cache
scala> rdd.count

Now it's cached.

Upvotes: 3

Sachin Janani
Sachin Janani

Reputation: 1319

I hope you are running these queries one after another in single go,if yes the why are you creating separate sqlContext for each query? Also can you try repartition the RDD which will increase the parallelism.Also if possible cache the RDD.

Hope the above steps will improve the performance.

Upvotes: 1

Related Questions