Reputation: 333
I am using spark 2.11 version and I am doing only 3 basic operations in my application:
But for these 3 operations, it takes almost 20 minutes. If I do same operations in SQL, it will take less than 1 minutes.
I have started to use spark because it will yield results very fast but it is taking too much of time. How to improve performance?
Step 1: taking records from the database.
Properties connectionProperties = new Properties();
connectionProperties.put("user", "test");
connectionProperties.put("password", "test##");
String query="(SELECT * from items)
dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties);
Step2: checking records of file A (5k) present in file B (2M) using contains
Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner");
Step3: writing matched records to a file of CSV format
NewSet.repartition(1).select("*")
.write().format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue", "")
.save(fileAbsolutePath);
To improve the performance I have tried several things like setting Cache, data serialization
set("spark.serializer","org.apache.spark.serializer.KryoSerializer")),
Shuffle time
sqlContext.setConf("spark.sql.shuffle.partitions", "10"),
Data Structure Tuning
-XX:+UseCompressedOops ,
none of the approaches is not yielding better performance.
Upvotes: 3
Views: 1436
Reputation: 1326
Increasing performance is more like improving parallelism.
Parallelism depends on number of partitions in RDD.
Make sure Dataset/Dataframe/RDD neither have too many number of partitions nor very less number of partitions.
Please check below suggestions where you can improve your code. I'm more comfortable with scala so I am providing suggestions in scala.
Step1: Make sure you have control on connections you make with database by mentionioning numPartitions.
Number of connections = number of partitions.
Below I just assigned 10 to num_partitions, this you have to tune to get more performance.
int num_partitions;
num_partitions = 10;
Properties connectionProperties = new Properties();
connectionProperties.put("user", "test");
connectionProperties.put("password", "test##");
connectionProperties.put("partitionColumn", "hash_code");
String query = "(SELECT mod(A.id,num_partitions) as hash_code, A.* from items A)";
dataFileContent = spark.read()
.jdbc("jdbc:oracle:thin:@//172.20.0.11/devad",
dbtable = query,
columnName = "hash_code",
lowerBound = 0,
upperBound = num_partitions,
numPartitions = num_partitions,
connectionProperties);
You can check how numPartitions works
Step2:
Dataset<Row> NewSet = source.join(target,
target.col("ItemIDTarget").contains(source.col("ItemIDSource")),
"inner");
Since one of table/dataframe having 5k records(small amount of data) you can use broadcast join as mentioned below.
import org.apache.spark.sql.functions.broadcast
val joined_df = largeTableDF.join(broadcast(smallTableDF), "key")
Step3: Use coalesce to decrease number of partitions so that it avoids full shuffle.
NewSet.coalesce(1).select("*")
.write().format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue", "")
.save(fileAbsolutePath);
Hope my answer helps you.
Upvotes: 6