Reputation: 181
I use Spark 2.1.1. I do many joins and selects on an input DS (inputDs) in a loop by hour it looks like this:
val myDs = Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
}).reduce(_.union(_))
def getDsForOneHour(ds: Dataset[I], year:Int, month:Int, day:Int, hour: Int)(implicit sql: SQLImplicits):Dataset[I]= {
ds.where(col("year") === year and col("month") === month and col("day") === day and col("hour") === hour)
}
I run that code using spark-testing-base and it takes about 3 minutes to complete operations for one month (~30*24 unions&selects). These are all lazy operations I'm wondering why it takes so much time Spark to build myDs ?
Upvotes: 2
Views: 5210
Reputation: 27373
I guess its slow because the execution plan is updated for every new dataset unioned in the loop. You could rewrite your code to build up the filter first:
def getFilterForOneHour(year:Int, month:Int, day:Int, hour: Int): Column = {
col("year") === year and col("month") === month and col("day") === day and col("hour") === hour
}
val myFilter = Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getFilterForOneHour(next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
}).reduce(_ or _)
val myDs = inputDs.where(myFilter)
EDIT: What you can also try is to do a group-wise union (my case with a batch-size of 50). I've run some tests wich a dummy in-memory dataset, and this improved performance by a factor of 8 in my case:
val myDs = Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
})
.grouped(50).map(dss => dss.reduce(_ union _))
.reduce(_ union _)
Upvotes: 2