Cassie
Cassie

Reputation: 3099

Split dataframe by column values Scala

I need to split a dataframe into multiple dataframes by the timestamp column. So I would provide a number of hours that this dataframe should contain and will get a set of dataframes with a specified number of hours in each of those.

The signature of the method would look like this:

def splitDataframes(df: DataFrame, hoursNumber: Int): Seq[DataFrame]

How can I achieve that?

The schema of the dataframe looks like this:

root
 |-- date_time: integer (nullable = true)
 |-- user_id: long (nullable = true)
 |-- order_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_date: date (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- event_hour: long (nullable = true)

Some of the input df fields:

event_ts, user_id
2020-12-13 08:22:00, 1
2020-12-13 08:51:00, 2
2020-12-13 09:28:00, 1
2020-12-13 10:53:00, 3
2020-12-13 11:05:00, 1
2020-12-13 12:19:00, 2

Some of the output df fields with hoursNumber=2:

df1   event_ts, user_id
      2020-12-13 08:22:00, 1
      2020-12-13 08:51:00, 2
      2020-12-13 09:28:00, 1
df2   2020-12-13 10:46:00, 3
      2020-12-13 11:05:00, 1
df3   2020-12-13 12:48:00, 2

Upvotes: 0

Views: 661

Answers (1)

mck
mck

Reputation: 42332

Convert the timestamp to unix timestamp, and then work out the id for each row using the time difference from the earliest timestamp.

EDIT: the solution is even simpler if you count starting time from 00:00:00.

import org.apache.spark.sql.DataFrame

def splitDataframes(df: DataFrame, hoursNumber: Int): Seq[DataFrame] = {

    val df2 = df.withColumn(
        "event_unix_ts",
        unix_timestamp($"event_ts")
    ).withColumn(
        "grouping",
        floor($"event_unix_ts" / (3600 * hoursNumber))
    ).drop("event_unix_ts")
    
    val df_array = df2.select("grouping").distinct().collect().map(
                   x => df2.filter($"grouping" === x(0)).drop("grouping")).toSeq
    
    return df_array

}

Upvotes: 1

Related Questions