Marious
Marious

Reputation: 181

Slow unions on a Dataset

I use Spark 2.1.1. I do many joins and selects on an input DS (inputDs) in a loop by hour it looks like this:

val myDs =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
}).reduce(_.union(_))

def getDsForOneHour(ds: Dataset[I], year:Int, month:Int, day:Int, hour: Int)(implicit sql: SQLImplicits):Dataset[I]= {
ds.where(col("year") === year and col("month") ===  month and col("day") ===  day and col("hour") === hour)
}

I run that code using spark-testing-base and it takes about 3 minutes to complete operations for one month (~30*24 unions&selects). These are all lazy operations I'm wondering why it takes so much time Spark to build myDs ?

Upvotes: 2

Views: 5210

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27373

I guess its slow because the execution plan is updated for every new dataset unioned in the loop. You could rewrite your code to build up the filter first:

def getFilterForOneHour(year:Int, month:Int, day:Int, hour: Int): Column = {
  col("year") === year and col("month") ===  month and col("day") ===  day and col("hour") === hour
} 


val myFilter =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getFilterForOneHour(next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
}).reduce(_ or _)

val myDs = inputDs.where(myFilter)

EDIT: What you can also try is to do a group-wise union (my case with a batch-size of 50). I've run some tests wich a dummy in-memory dataset, and this improved performance by a factor of 8 in my case:

val myDs =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
})
.grouped(50).map(dss => dss.reduce(_ union _))
.reduce(_ union _)

Upvotes: 2

Related Questions