Reputation: 1387
I have a DateType input in the function. I would like to exclude Saturday and Sunday and get the next week day, if the input date falls on the weekend, otherwise it should give the next day's date
Example: Input: Monday 1/1/2017 output: 1/2/2017 (which is Tuesday) Input: Saturday 3/4/2017 output: 3/5/2017 (which is Monday)
I have gone through https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html but I don't see a ready made function, so I think it will need to be created.
So far I have something that is:
val nextWeekDate = udf {(startDate: DateType) =>
val day= date_format(startDate,'E'
if(day=='Sat' or day=='Sun'){
nextWeekDate = next_day(startDate,'Mon')
}
else{
nextWeekDate = date_add(startDate, 1)
}
}
Need help to get it valid and working.
Upvotes: 3
Views: 1697
Reputation: 19308
Here's a much simpler answer that's defined in spark-daria:
def nextWeekday(col: Column): Column = {
val d = dayofweek(col)
val friday = lit(6)
val saturday = lit(7)
when(col.isNull, null)
.when(d === friday || d === saturday, next_day(col,"Mon"))
.otherwise(date_add(col, 1))
}
You always want to stick with the native Spark functions whenever possible. This post explains the derivation of this function in greater detail.
Upvotes: 1
Reputation: 4123
Using dates as strings:
import java.time.{DayOfWeek, LocalDate}
import java.time.format.DateTimeFormatter
// If that is your format date
object MyFormat {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
}
object MainSample {
import MyFormat._
def main(args: Array[String]): Unit = {
import java.sql.Date
import org.apache.spark.sql.types.{DateType, IntegerType}
import spark.implicits._
import org.apache.spark.sql.types.{ StringType, StructField, StructType }
import org.apache.spark.sql.functions._
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("YourApp")
.config("spark.master", "local")
.getOrCreate()
val someData = Seq(
Row(1,"2013-01-30"),
Row(2,"2012-01-01")
)
val schema = List(StructField("id", IntegerType), StructField("date",StringType))
val sourceDF = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(schema))
sourceDF.show()
val _udf = udf { (dt: String) =>
// Parse your date, dt is a string
val localDate = LocalDate.parse(dt, formatter)
// Check the week day and add days in each case
val newDate = if ((localDate.getDayOfWeek == DayOfWeek.SATURDAY)) {
localDate.plusDays(2)
} else if (localDate.getDayOfWeek == DayOfWeek.SUNDAY) {
localDate.plusDays(1)
} else {
localDate.plusDays(1)
}
newDate.toString
}
sourceDF.withColumn("NewDate", _udf('date)).show()
}
}
Upvotes: 3