Hardik Gupta
Hardik Gupta

Reputation: 4790

Calculate time difference between consecutive rows in pairs per group in pyspark

I want to calculate the time spent per SeqID for each user. I have a dataframe like this. However, the time is split between two actions for every user, Action_A and Action_B. The total time per user, per seqID would be sum across all such pairs

For first user, it is 5 + 3 [(2019-12-10 10:00:00 - 2019-12-10 10:05:00) + (2019-12-10 10:20:00 - 2019-12-10 10:23:00)]

So first user has ideally spent 8 mins for SeqID 1 (and not 23 mins).

Similarly user 2 has spent 1 + 5 = 6 mins

How can I calculate this using pyspark?

data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")), 
        (("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
        (("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]
df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])
df.show()

+---+-----+-------------------+--------+
| ID|SeqID|          Timestamp|  Action|
+---+-----+-------------------+--------+
|ID1|   15|2019-12-10 10:00:00|Action_A|
|ID1|   15|2019-12-10 10:05:00|Action_B|
|ID1|   15|2019-12-10 10:20:00|Action_A|
|ID1|   15|2019-12-10 10:23:00|Action_B|
|ID2|   23|2019-12-10 11:10:00|Action_A|
|ID2|   23|2019-12-10 11:11:00|Action_B|
|ID2|   23|2019-12-10 11:30:00|Action_A|
|ID2|   23|2019-12-10 11:35:00|Action_B|
+---+-----+-------------------+--------+

Once I have the data for each pair, I can sum across the group (ID, SeqID)

Expected output (could be seconds also)

+---+-----+--------+
| ID|SeqID|Dur_Mins|
+---+-----+--------+
|ID1|   15|       8|
|ID2|   23|       6|
+---+-----+--------+

Upvotes: 3

Views: 5503

Answers (3)

04dev
04dev

Reputation: 1

Another possible solution using Window Functions

spark = SparkSession.Builder().master("local[3]").appName("TestApp").enableHiveSupport().getOrCreate()

data = [(("ID1", 15, "2019-12-10 10:00:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:05:00", "Action_B")),
        (("ID1", 15, "2019-12-10 10:20:00", "Action_A")),
        (("ID1", 15, "2019-12-10 10:23:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:10:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:11:00", "Action_B")),
        (("ID2", 23, "2019-12-10 11:30:00", "Action_A")),
        (("ID2", 23, "2019-12-10 11:35:00", "Action_B"))]

df = spark.createDataFrame(data, ["ID", "SeqID", "Timestamp", "Action"])

df.registerTempTable("tmpTbl")

df = spark.sql("select *, lead(Timestamp,1) over (partition by ID,SeqID order by Timestamp) Next_Timestamp from tmpTbl")

updated_df = df.filter("Action != 'Action_B'")

final_df = updated_df.withColumn("diff", (F.unix_timestamp('Next_Timestamp') - F.unix_timestamp('Timestamp'))/F.lit(60))

final_df.groupBy("ID","SeqID").agg(F.sum(F.col("diff")).alias("Duration")).show()

Output

Upvotes: 0

samkart
samkart

Reputation: 6644

A possible (might be complicated as well) way to do it with flatMapValues and rdd

Using your data variable

df = spark.createDataFrame(data, ["id", "seq_id", "ts", "action"]). \
    withColumn('ts', func.col('ts').cast('timestamp'))

# func to calculate the duration | applied on each row
def getDur(groupedrows):
    """
    """

    res = []

    for row in groupedrows:
        if row.action == 'Action_A':
            frst_ts = row.ts
            dur = 0
        elif row.action == 'Action_B':
            dur = (row.ts - frst_ts).total_seconds()

        res.append([val for val in row] + [float(dur)])

    return res

# run the rules on the base df | row by row
# grouped on ID, SeqID - sorted on timestamp
dur_rdd = df.rdd. \
    groupBy(lambda k: (k.id, k.seq_id)). \
    flatMapValues(lambda r: getDur(sorted(r, key=lambda ok: ok.ts))). \
    values()

# specify final schema
dur_schema = df.schema. \
    add('dur', 'float')

# convert to DataFrame
dur_sdf = spark.createDataFrame(dur_rdd, dur_schema)

dur_sdf.orderBy('id', 'seq_id', 'ts').show()

+---+------+-------------------+--------+-----+
| id|seq_id|                 ts|  action|  dur|
+---+------+-------------------+--------+-----+
|ID1|    15|2019-12-10 10:00:00|Action_A|  0.0|
|ID1|    15|2019-12-10 10:05:00|Action_B|300.0|
|ID1|    15|2019-12-10 10:20:00|Action_A|  0.0|
|ID1|    15|2019-12-10 10:23:00|Action_B|180.0|
|ID2|    23|2019-12-10 11:10:00|Action_A|  0.0|
|ID2|    23|2019-12-10 11:11:00|Action_B| 60.0|
|ID2|    23|2019-12-10 11:30:00|Action_A|  0.0|
|ID2|    23|2019-12-10 11:35:00|Action_B|300.0|
+---+------+-------------------+--------+-----+

# Your required data
dur_sdf.groupBy('id', 'seq_id'). \
    agg((func.sum('dur') / func.lit(60)).alias('dur_mins')). \
    show()

+---+------+--------+
| id|seq_id|dur_mins|
+---+------+--------+
|ID1|    15|     8.0|
|ID2|    23|     6.0|
+---+------+--------+

This fits the data you've described, but check if it fits your all your cases.

Upvotes: 1

blackbishop
blackbishop

Reputation: 32660

Here is a possible solution using Higher-Order Functions (Spark >=2.4):

transform_expr = "transform(ts_array, (x,i) -> (unix_timestamp(ts_array[i+1]) - unix_timestamp(x))/60 * ((i+1)%2))"

df.groupBy("ID", "SeqID").agg(array_sort(collect_list(col("Timestamp"))).alias("ts_array")) \
    .withColumn("transformed_ts_array", expr(transform_expr)) \
    .withColumn("Dur_Mins", expr("aggregate(transformed_ts_array, 0D, (acc, x) -> acc + coalesce(x, 0D))")) \
    .drop("transformed_ts_array", "ts_array") \
    .show(truncate=False)

Steps:

  1. Collect all timestamps to array for each group ID, SeqID and sort them in ascending order
  2. Apply a transform to the array with lambda function (x, i) => Double. Where x is the actual element and i its index. For each timestamp in the array, we calculate the diff with the next timestamp. And we multiply by (i+1)%2 in order to have only the diff as pairs 2 per 2 (first with the second, third with the fourth, ...) as there are always 2 actions.
  3. Finally, we aggregate the result array of transformation to sum all the elements.

Output:

+---+-----+--------+
|ID |SeqID|Dur_Mins|
+---+-----+--------+
|ID1|15   |8.0     |
|ID2|23   |6.0     |
+---+-----+--------+

Upvotes: 4

Related Questions