Reputation: 4650
I am new to scala/spark (about a week now)
The following code is being run on my 8 core laptop, 64bit, Win10 The dataframe has 1700 rows. ONE select takes over ten seconds.
Watching the console shows the main hang is at this point:
17/09/02 12:23:46 INFO FileSourceStrategy: Pruning directories with:
The Code
{
val major:String =name.substring(0,name.indexOf("_SCORE"))+"_idx1"
println(major)
val majors = dfMergedDroppedDeleted
.select(col(major))
.collect().toSeq
println(s"got majors ${majors.size}")
}
This should take milliseconds (based on experience with hibernate,r,mysql etc) I am assuming there is something wrong with my configuration of spark?
Any suggestions would be most welcome.
The full console output up to the hang is as follows:
1637_1636_1716_idx1
1637_1636_1716_idx2
17/09/02 12:23:08 INFO ContextCleaner: Cleaned accumulator 765
17/09/02 12:23:08 INFO ContextCleaner: Cleaned accumulator 763
17/09/02 12:23:08 INFO BlockManagerInfo: Removed broadcast_51_piece0 on 192.168.0.13:62246 in memory (size: 113.7 KB, free: 901.6 MB)
17/09/02 12:23:08 INFO ContextCleaner: Cleaned accumulator 761
17/09/02 12:23:08 INFO ContextCleaner: Cleaned accumulator 764
17/09/02 12:23:08 INFO ContextCleaner: Cleaned accumulator 762
17/09/02 12:23:08 INFO ContextCleaner: Cleaned accumulator 766
17/09/02 12:23:08 INFO BlockManagerInfo: Removed broadcast_50_piece0 on 192.168.0.13:62246 in memory (size: 20.7 KB, free: 901.6 MB)
17/09/02 12:23:08 INFO FileSourceStrategy: Pruning directories with:
Upvotes: 2
Views: 1359
Reputation: 4650
Putting the dataframe in cache makes a big difference.
val dfMergedDroppedDeletedCached:DataFrame=dfMergedDroppedDeleted.cache()
However, the caching process itself is slow, so this only pays off if you are performing multiple operations
UPDATE Credit Ramesh Maharjan to who wrote in a comment:
the time consuming part is not select. select is distributed in nature and would be executed in every local data in executors. The time consuming part is the collect. Collect function collects all the data in the driver node. And that takes a lot of time. Thats why collect is always recommended not to be used and if necessary to use it the minimum.
I have changed the query to be as follows:
val majorstr:String = dfMergedDroppedDeletedCached.filter(dfMergedDroppedDeletedCached(major).isNotNull)
.select(col(major))
.limit(1)
.first().getString(0)
Not exactly Oracle speeds but much faster than using collect
Upvotes: 1