user2441441
user2441441

Reputation: 1387

Get next week date in Spark Dataframe using scala

I have a DateType input in the function. I would like to exclude Saturday and Sunday and get the next week day, if the input date falls on the weekend, otherwise it should give the next day's date

Example: Input: Monday 1/1/2017 output: 1/2/2017 (which is Tuesday) Input: Saturday 3/4/2017 output: 3/5/2017 (which is Monday)

I have gone through https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html but I don't see a ready made function, so I think it will need to be created.

So far I have something that is:

val nextWeekDate = udf {(startDate: DateType) =>
val day= date_format(startDate,'E'
 if(day=='Sat' or day=='Sun'){
  nextWeekDate = next_day(startDate,'Mon')
}
 else{
   nextWeekDate =   date_add(startDate, 1)
 }
}

Need help to get it valid and working.

Upvotes: 3

Views: 1697

Answers (2)

Powers
Powers

Reputation: 19308

Here's a much simpler answer that's defined in spark-daria:

def nextWeekday(col: Column): Column = {
  val d = dayofweek(col)
  val friday = lit(6)
  val saturday = lit(7)
  when(col.isNull, null)
    .when(d === friday || d === saturday, next_day(col,"Mon"))
    .otherwise(date_add(col, 1))
}

You always want to stick with the native Spark functions whenever possible. This post explains the derivation of this function in greater detail.

Upvotes: 1

Emiliano Martinez
Emiliano Martinez

Reputation: 4123

Using dates as strings:

import java.time.{DayOfWeek, LocalDate}
import java.time.format.DateTimeFormatter

// If that is your format date
object MyFormat {
  val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
}

object MainSample {
  import MyFormat._

  def main(args: Array[String]): Unit = {
    import java.sql.Date
    import org.apache.spark.sql.types.{DateType, IntegerType}

    import spark.implicits._  

    import org.apache.spark.sql.types.{ StringType, StructField, StructType }
    import org.apache.spark.sql.functions._

    implicit val spark: SparkSession =
          SparkSession
            .builder()
            .appName("YourApp")
            .config("spark.master", "local")
            .getOrCreate()

    val someData = Seq(
      Row(1,"2013-01-30"),
      Row(2,"2012-01-01")
    )

    val schema = List(StructField("id", IntegerType), StructField("date",StringType))
    val sourceDF = spark.createDataFrame(spark.sparkContext.parallelize(someData), StructType(schema))

    sourceDF.show()

    val _udf = udf { (dt: String) =>
      // Parse your date, dt is a string
      val localDate = LocalDate.parse(dt, formatter)

      // Check the week day and add days in each case
      val newDate = if ((localDate.getDayOfWeek == DayOfWeek.SATURDAY)) {
        localDate.plusDays(2)
      } else if (localDate.getDayOfWeek == DayOfWeek.SUNDAY) {
        localDate.plusDays(1)
      } else {
        localDate.plusDays(1)
      }
      newDate.toString
    }

    sourceDF.withColumn("NewDate", _udf('date)).show()
  }
}

Upvotes: 3

Related Questions