Reputation: 3099
I need to split a dataframe into multiple dataframes by the timestamp column. So I would provide a number of hours that this dataframe should contain and will get a set of dataframes with a specified number of hours in each of those.
The signature of the method would look like this:
def splitDataframes(df: DataFrame, hoursNumber: Int): Seq[DataFrame]
How can I achieve that?
The schema of the dataframe looks like this:
root
|-- date_time: integer (nullable = true)
|-- user_id: long (nullable = true)
|-- order_id: string (nullable = true)
|-- description: string (nullable = true)
|-- event_date: date (nullable = true)
|-- event_ts: timestamp (nullable = true)
|-- event_hour: long (nullable = true)
Some of the input df fields:
event_ts, user_id
2020-12-13 08:22:00, 1
2020-12-13 08:51:00, 2
2020-12-13 09:28:00, 1
2020-12-13 10:53:00, 3
2020-12-13 11:05:00, 1
2020-12-13 12:19:00, 2
Some of the output df fields with hoursNumber=2:
df1 event_ts, user_id
2020-12-13 08:22:00, 1
2020-12-13 08:51:00, 2
2020-12-13 09:28:00, 1
df2 2020-12-13 10:46:00, 3
2020-12-13 11:05:00, 1
df3 2020-12-13 12:48:00, 2
Upvotes: 0
Views: 661
Reputation: 42332
Convert the timestamp to unix timestamp, and then work out the id for each row using the time difference from the earliest timestamp.
EDIT: the solution is even simpler if you count starting time from 00:00:00.
import org.apache.spark.sql.DataFrame
def splitDataframes(df: DataFrame, hoursNumber: Int): Seq[DataFrame] = {
val df2 = df.withColumn(
"event_unix_ts",
unix_timestamp($"event_ts")
).withColumn(
"grouping",
floor($"event_unix_ts" / (3600 * hoursNumber))
).drop("event_unix_ts")
val df_array = df2.select("grouping").distinct().collect().map(
x => df2.filter($"grouping" === x(0)).drop("grouping")).toSeq
return df_array
}
Upvotes: 1