Data Tao
Data Tao

Reputation: 23

Compute difference between values int according to a condition with Pyspark

I have data like this:

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import substring, length
dept = [("A",1,"2020-11-07 23:19:12",50), ("A",1,"2020-11-07 23:19:16",42), ("A",1,"2020-11-07 23:19:56",30), ("A",0,"2020-11-07 23:20:37",30), ("A",0,"2020-11-07 23:21:06",35), ("A",0,"2020-11-07 23:21:47",42), ("A",1,"2020-11-07 23:22:05",42), ("A",1,"2020-11-07 23:22:30",30),("A",1,"2020-11-07 23:23:00",22), ("A",0,"2020-11-07 23:23:20",50), ("A",0,"2020-11-07 23:23:40",52), ("A",1,"2020-11-07 23:24:10",52), ("A",1,"2020-11-07 23:25:50",45),("B",1,"2020-11-07 22:19:12",40), ("B",1,"2020-11-07 22:20:10",32), ("B",0,"2020-11-07 22:21:31",32), ("B",0,"2020-11-07 22:22:01",40), ("B",0,"2020-11-07 22:22:45",45), ("B",1,"2020-11-07 22:23:52",45), ("B",1,"2020-11-07 22:24:10",30)]
deptColumns = ["Id","BAP","Time","BAZ"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()

Data test

I wish to calculate the difference between the values in the "BAZ" column corresponding to the first 1 of BAP and the last one for each series of "1" and following each ID. Expected result is Something like this:

Result1

The final issue is to get the sum of "Delta":

ResultFinal

Here's my code that I can't adjust to get the desired result:

import pyspark.sql.functions as F
from pyspark.sql.window import Window
window = Window.partitionBy('Id').orderBy('Time')

Delta = deptDF.withColumn(
    'begin0',
    (F.lag('BAP').over(window) == 0) & (F.col('BAP') == 1)
).withColumn(
    'begin1',
    (F.lag('BAP').over(window) != 0) & (F.col('BAP') == 0)
).filter(
    'begin0 or begin1'
).withColumn(
    'Delta',
    F.when(
        F.col('BAP') == 1,
            (
            F.col('BAZ').cast('bigint') - F.lag('BAZ').over(window).cast('bigint')
            )
       )
    ).orderBy(
    'Id','Time'
)
Delta.show()

The substraction is realized following an asendant time evolution.

I need to modify this code to get the desired result. For the moment I don't find the right condition to put for the selection of the first and last "1". I thought of the "First" and "Last" function but I can't find the right combination.

NB: in the case where we have a first "1" without a last "1" (only one 1) I have to put 0 in the soustraction result without taking the next 1 after the series of zeros which erroneously subtracts it. Here is an example to illustrate the problem:

Illustration

I have to avoid this problem:

res

Upvotes: 0

Views: 721

Answers (1)

mck
mck

Reputation: 42352

You need to change the conditions a bit because it's different from your previous question (and my previous answer). You can use lead in addition to lag to achieve what you wanted.

import pyspark.sql.functions as F
from pyspark.sql.window import Window
window = Window.partitionBy('Id').orderBy('Time')

Delta = deptDF.withColumn(
    'end1',
    (F.lead('BAP', 1, 0).over(window) == 0) & (F.col('BAP') == 1)
).withColumn(
    'begin1',
    (F.lag('BAP', 1, 0).over(window) == 0) & (F.col('BAP') == 1)
).filter(
    'end1 or begin1'
).withColumn(
    'Delta',
    F.when(
        F.col('begin1') & F.col('end1'),
        0
    ).when(
        F.col('begin1'),
        F.col('BAZ') - F.lead('BAZ').over(window)
    ).otherwise(0)
).drop('begin1', 'end1', 'Time').orderBy('Id','Time')

Delta.show()
+---+---+---+-----+
| Id|BAP|BAZ|Delta|
+---+---+---+-----+
|  A|  1| 50|   20|
|  A|  1| 30|    0|
|  A|  1| 42|   20|
|  A|  1| 22|    0|
|  A|  1| 52|    7|
|  A|  1| 45|    0|
|  B|  1| 40|    8|
|  B|  1| 32|    0|
|  B|  1| 45|   15|
|  B|  1| 30|    0|
+---+---+---+-----+

And the final outcome can be done using a simple aggregation:

Delta.groupBy('id').sum('Delta').show()
+---+----------+
| id|sum(Delta)|
+---+----------+
|  B|        23|
|  A|        47|
+---+----------+

Upvotes: 1

Related Questions