Typhaon
Typhaon

Reputation: 1054

How do I group by depending on the previous value in spark

I have some data on a machine. When it runs, it creates at least one entry every 5 seconds, which contains a timestamp field. I want to know how long that machine is on. So I want to know the stretch between the first entry and the last entry.

I was thinking to order the data set by the timestamp, and then aggregate it(?) by taking the current value, and the previous value (or a zeroValue when there's no previous value) and then create two new columns 'timestamp_start' and 'timestamp_now' with the following idea:

If the distance between the 'timestamp' column is MORE than 5 seconds from the 'timestamp_now' of the previous entry then both 'timestamp_start' AND 'timestamp_now' will become 'timestamp' of the current value.

If the distance between the 'timestamp' column is LESS or equal than 5 seconds from the 'timestamp_now' of the previous entry then 'timestamp_start' will be copied from the previous value, and 'timestamp_now' will become 'timestamp' of the current value.

After that I would take the maximum of each 'timestamp_now' for each 'timestamp_start'. And then I would map those to an duration value. with this idea I should get a list of duration values which will indicate the running time of the machine each time it's turned on.

I'm feel like I would have to use a fold, agg, or reduce somewhere here, but I'm not sure which one and how. Another option I had in mind was using something like a sliding window and then do a map? but I'm not sure if that's an option.

I'm using spark for the first time so bear with me please. But this is what I got:

DataQuery.builder(spark).variables() \
    .system('XXX') \
    .nameLike('XXX%XXX%') \
    .timeWindow('2021-10-10 00:00:00.000', '2022-11-28 00:00:00.000') \
    .build() \
    .orderBy('timestamp')
    .agg('timestamp', # How do I get to the previous entry?)

EDIT:

I got a lot farther:

df = DataQuery.builder(spark).variables() \
    .system('XXX') \
    .nameLike('XXX') \
    .timeWindow('2021-08-10 00:00:00.000', '2022-11-28 00:00:00.000') \
    .build()

timestamps = df.sort('timestamp') \
    .select(psf.from_unixtime('nxcals_timestamp').alias('ts'))

# AT LEAST I HOPE THIS LINE IS RIGHT (?)
window = timestamps.groupBy(psf.session_window('ts', '10 minutes')) \
    .agg(psf.min(timestamps.ts))

window_timestamps = window.select(window.session_window.start.cast("string").alias("start"), window.session_window.end.cast("string").alias('end'))

and then the show() function will return:

+--------------------+--------------------+
|               start|                 end|
+--------------------+--------------------+
|-290308-12-21 20:...|-290308-12-21 20:...|
|-290308-12-23 17:...|-290308-12-23 17:...|
|-290308-12-25 06:...|-290308-12-25 06:...|
|-290308-12-25 15:...|-290308-12-25 15:...|
|-290307-01-01 05:...|-290307-01-01 05:...|
|-290307-01-04 06:...|-290307-01-04 06:...|
|-290307-01-04 19:...|-290307-01-04 19:...|
|-290307-01-05 05:...|-290307-01-05 05:...|
|-290307-01-05 08:...|-290307-01-05 08:...|
|-290307-01-06 00:...|-290307-01-06 00:...|
|-290307-01-10 07:...|-290307-01-10 07:...|
|-290307-01-14 11:...|-290307-01-14 11:...|
|-290307-01-15 03:...|-290307-01-15 04:...|
|-290307-01-15 08:...|-290307-01-15 08:...|
|-290307-01-15 13:...|-290307-01-15 13:...|
|-290307-01-16 17:...|-290307-01-16 17:...|
|-290307-01-20 16:...|-290307-01-20 16:...|
|-290307-01-24 19:...|-290307-01-24 19:...|
|-290307-01-26 17:...|-290307-01-26 17:...|
|-290307-01-30 23:...|-290307-01-30 23:...|
+--------------------+--------------------+

There's just one line I\m not completely sure about, but it seems to return the right data. Now I only need to get that data mapped to a single column with the time differences. I'm currently trying

diff = window_timestamps.rdd.map(lambda row: row.end.cast('long') - row.start.cast('long')).toDF(["diff_in_seconds"])

But that seems to hang

EDIT2: Nope, doesn't seem to work.

Upvotes: 0

Views: 99

Answers (1)

Azhar Khan
Azhar Khan

Reputation: 4098

The logic is:

  • Create a window "w" by paritioning the data with some "key" and ordering by "timestamp".
  • Lag timestamp by one offset to use it as "previous" timestamp.
  • Compute timestamp as per logic mentioned in source code comments.
  • Finally, group by start timestamp and find its longest end timestamp.
df = spark.createDataFrame(data=[["2022-12-19 08:00:00"],["2022-12-19 08:00:04"],["2022-12-19 08:00:08"],["2022-12-19 08:10:00"],["2022-12-19 08:10:04"],["2022-12-19 08:10:08"],["2022-12-19 08:10:12"],["2022-12-19 09:00:00"]], schema=["timestamp"])

import pyspark.sql.functions as F
from pyspark.sql.window import Window

df = df.withColumn("timestamp", F.to_timestamp("timestamp", format="yyyy-MM-dd HH:mm:ss"))

# Need some "key" to partition the data and order by "timestamp".
df = df.withColumn("dummy_key", F.lit("0"))
w = Window.partitionBy("dummy_key").orderBy("timestamp")
df = df.withColumn("prev_ts", F.lag("timestamp", offset=1).over(w)) \
       .withColumn("ts_diff", (F.col("timestamp").cast("long") - F.col("prev_ts").cast("long")))

# Compute initial start timestamp values.
df = df.withColumn("start", F.when(F.col("prev_ts").isNull(), F.col("timestamp")) \
                             .when(F.col("ts_diff") >= 5, F.col("timestamp")))

# Window to forward fill (ffill) start values.
w2 = Window.partitionBy("dummy_key").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("start", F.last("start", ignorenulls=True).over(w2))

# Compute running current timestamp values.
df = df.withColumn("prev_start", F.lag("start", offset=1).over(w))
df = df.withColumn("start", F.when(F.col("ts_diff") < 5, F.col("prev_start")) \
                             .otherwise(F.col("start")))
df = df.withColumn("now", F.col("timestamp"))

# For each start, compute its end timestamp.
df = df.groupBy("start") \
       .agg(F.max("now").alias("end")) \
       .withColumn("run_period_in_seconds", F.col("end").cast("long") - F.col("start").cast("long")) \
       .sort("start")

Output:

+-------------------+-------------------+---------------------+
|start              |end                |run_period_in_seconds|
+-------------------+-------------------+---------------------+
|2022-12-19 08:00:00|2022-12-19 08:00:08|8                    |
|2022-12-19 08:10:00|2022-12-19 08:10:12|12                   |
|2022-12-19 09:00:00|2022-12-19 09:00:00|0                    |
+-------------------+-------------------+---------------------+

Upvotes: 1

Related Questions