Reputation: 1054
I have some data on a machine. When it runs, it creates at least one entry every 5 seconds, which contains a timestamp field. I want to know how long that machine is on. So I want to know the stretch between the first entry and the last entry.
I was thinking to order the data set by the timestamp, and then aggregate it(?) by taking the current value, and the previous value (or a zeroValue when there's no previous value) and then create two new columns 'timestamp_start' and 'timestamp_now' with the following idea:
If the distance between the 'timestamp' column is MORE than 5 seconds from the 'timestamp_now' of the previous entry then both 'timestamp_start' AND 'timestamp_now' will become 'timestamp' of the current value.
If the distance between the 'timestamp' column is LESS or equal than 5 seconds from the 'timestamp_now' of the previous entry then 'timestamp_start' will be copied from the previous value, and 'timestamp_now' will become 'timestamp' of the current value.
After that I would take the maximum of each 'timestamp_now' for each 'timestamp_start'. And then I would map those to an duration value. with this idea I should get a list of duration values which will indicate the running time of the machine each time it's turned on.
I'm feel like I would have to use a fold, agg, or reduce somewhere here, but I'm not sure which one and how. Another option I had in mind was using something like a sliding window and then do a map? but I'm not sure if that's an option.
I'm using spark for the first time so bear with me please. But this is what I got:
DataQuery.builder(spark).variables() \
.system('XXX') \
.nameLike('XXX%XXX%') \
.timeWindow('2021-10-10 00:00:00.000', '2022-11-28 00:00:00.000') \
.build() \
.orderBy('timestamp')
.agg('timestamp', # How do I get to the previous entry?)
EDIT:
I got a lot farther:
df = DataQuery.builder(spark).variables() \
.system('XXX') \
.nameLike('XXX') \
.timeWindow('2021-08-10 00:00:00.000', '2022-11-28 00:00:00.000') \
.build()
timestamps = df.sort('timestamp') \
.select(psf.from_unixtime('nxcals_timestamp').alias('ts'))
# AT LEAST I HOPE THIS LINE IS RIGHT (?)
window = timestamps.groupBy(psf.session_window('ts', '10 minutes')) \
.agg(psf.min(timestamps.ts))
window_timestamps = window.select(window.session_window.start.cast("string").alias("start"), window.session_window.end.cast("string").alias('end'))
and then the show()
function will return:
+--------------------+--------------------+
| start| end|
+--------------------+--------------------+
|-290308-12-21 20:...|-290308-12-21 20:...|
|-290308-12-23 17:...|-290308-12-23 17:...|
|-290308-12-25 06:...|-290308-12-25 06:...|
|-290308-12-25 15:...|-290308-12-25 15:...|
|-290307-01-01 05:...|-290307-01-01 05:...|
|-290307-01-04 06:...|-290307-01-04 06:...|
|-290307-01-04 19:...|-290307-01-04 19:...|
|-290307-01-05 05:...|-290307-01-05 05:...|
|-290307-01-05 08:...|-290307-01-05 08:...|
|-290307-01-06 00:...|-290307-01-06 00:...|
|-290307-01-10 07:...|-290307-01-10 07:...|
|-290307-01-14 11:...|-290307-01-14 11:...|
|-290307-01-15 03:...|-290307-01-15 04:...|
|-290307-01-15 08:...|-290307-01-15 08:...|
|-290307-01-15 13:...|-290307-01-15 13:...|
|-290307-01-16 17:...|-290307-01-16 17:...|
|-290307-01-20 16:...|-290307-01-20 16:...|
|-290307-01-24 19:...|-290307-01-24 19:...|
|-290307-01-26 17:...|-290307-01-26 17:...|
|-290307-01-30 23:...|-290307-01-30 23:...|
+--------------------+--------------------+
There's just one line I\m not completely sure about, but it seems to return the right data. Now I only need to get that data mapped to a single column with the time differences. I'm currently trying
diff = window_timestamps.rdd.map(lambda row: row.end.cast('long') - row.start.cast('long')).toDF(["diff_in_seconds"])
But that seems to hang
EDIT2: Nope, doesn't seem to work.
Upvotes: 0
Views: 99
Reputation: 4098
The logic is:
df = spark.createDataFrame(data=[["2022-12-19 08:00:00"],["2022-12-19 08:00:04"],["2022-12-19 08:00:08"],["2022-12-19 08:10:00"],["2022-12-19 08:10:04"],["2022-12-19 08:10:08"],["2022-12-19 08:10:12"],["2022-12-19 09:00:00"]], schema=["timestamp"])
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df = df.withColumn("timestamp", F.to_timestamp("timestamp", format="yyyy-MM-dd HH:mm:ss"))
# Need some "key" to partition the data and order by "timestamp".
df = df.withColumn("dummy_key", F.lit("0"))
w = Window.partitionBy("dummy_key").orderBy("timestamp")
df = df.withColumn("prev_ts", F.lag("timestamp", offset=1).over(w)) \
.withColumn("ts_diff", (F.col("timestamp").cast("long") - F.col("prev_ts").cast("long")))
# Compute initial start timestamp values.
df = df.withColumn("start", F.when(F.col("prev_ts").isNull(), F.col("timestamp")) \
.when(F.col("ts_diff") >= 5, F.col("timestamp")))
# Window to forward fill (ffill) start values.
w2 = Window.partitionBy("dummy_key").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("start", F.last("start", ignorenulls=True).over(w2))
# Compute running current timestamp values.
df = df.withColumn("prev_start", F.lag("start", offset=1).over(w))
df = df.withColumn("start", F.when(F.col("ts_diff") < 5, F.col("prev_start")) \
.otherwise(F.col("start")))
df = df.withColumn("now", F.col("timestamp"))
# For each start, compute its end timestamp.
df = df.groupBy("start") \
.agg(F.max("now").alias("end")) \
.withColumn("run_period_in_seconds", F.col("end").cast("long") - F.col("start").cast("long")) \
.sort("start")
Output:
+-------------------+-------------------+---------------------+
|start |end |run_period_in_seconds|
+-------------------+-------------------+---------------------+
|2022-12-19 08:00:00|2022-12-19 08:00:08|8 |
|2022-12-19 08:10:00|2022-12-19 08:10:12|12 |
|2022-12-19 09:00:00|2022-12-19 09:00:00|0 |
+-------------------+-------------------+---------------------+
Upvotes: 1