Reputation: 23
I have data like this:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import substring, length
dept = [("A",1,"2020-11-07 23:19:12",50), ("A",1,"2020-11-07 23:19:16",42), ("A",1,"2020-11-07 23:19:56",30), ("A",0,"2020-11-07 23:20:37",30), ("A",0,"2020-11-07 23:21:06",35), ("A",0,"2020-11-07 23:21:47",42), ("A",1,"2020-11-07 23:22:05",42), ("A",1,"2020-11-07 23:22:30",30),("A",1,"2020-11-07 23:23:00",22), ("A",0,"2020-11-07 23:23:20",50), ("A",0,"2020-11-07 23:23:40",52), ("A",1,"2020-11-07 23:24:10",52), ("A",1,"2020-11-07 23:25:50",45),("B",1,"2020-11-07 22:19:12",40), ("B",1,"2020-11-07 22:20:10",32), ("B",0,"2020-11-07 22:21:31",32), ("B",0,"2020-11-07 22:22:01",40), ("B",0,"2020-11-07 22:22:45",45), ("B",1,"2020-11-07 22:23:52",45), ("B",1,"2020-11-07 22:24:10",30)]
deptColumns = ["Id","BAP","Time","BAZ"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()
I wish to calculate the difference between the values in the "BAZ" column corresponding to the first 1 of BAP and the last one for each series of "1" and following each ID. Expected result is Something like this:
The final issue is to get the sum of "Delta":
Here's my code that I can't adjust to get the desired result:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
window = Window.partitionBy('Id').orderBy('Time')
Delta = deptDF.withColumn(
'begin0',
(F.lag('BAP').over(window) == 0) & (F.col('BAP') == 1)
).withColumn(
'begin1',
(F.lag('BAP').over(window) != 0) & (F.col('BAP') == 0)
).filter(
'begin0 or begin1'
).withColumn(
'Delta',
F.when(
F.col('BAP') == 1,
(
F.col('BAZ').cast('bigint') - F.lag('BAZ').over(window).cast('bigint')
)
)
).orderBy(
'Id','Time'
)
Delta.show()
The substraction is realized following an asendant time evolution.
I need to modify this code to get the desired result. For the moment I don't find the right condition to put for the selection of the first and last "1". I thought of the "First" and "Last" function but I can't find the right combination.
NB: in the case where we have a first "1" without a last "1" (only one 1) I have to put 0 in the soustraction result without taking the next 1 after the series of zeros which erroneously subtracts it. Here is an example to illustrate the problem:
I have to avoid this problem:
Upvotes: 0
Views: 721
Reputation: 42352
You need to change the conditions a bit because it's different from your previous question (and my previous answer). You can use lead
in addition to lag
to achieve what you wanted.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
window = Window.partitionBy('Id').orderBy('Time')
Delta = deptDF.withColumn(
'end1',
(F.lead('BAP', 1, 0).over(window) == 0) & (F.col('BAP') == 1)
).withColumn(
'begin1',
(F.lag('BAP', 1, 0).over(window) == 0) & (F.col('BAP') == 1)
).filter(
'end1 or begin1'
).withColumn(
'Delta',
F.when(
F.col('begin1') & F.col('end1'),
0
).when(
F.col('begin1'),
F.col('BAZ') - F.lead('BAZ').over(window)
).otherwise(0)
).drop('begin1', 'end1', 'Time').orderBy('Id','Time')
Delta.show()
+---+---+---+-----+
| Id|BAP|BAZ|Delta|
+---+---+---+-----+
| A| 1| 50| 20|
| A| 1| 30| 0|
| A| 1| 42| 20|
| A| 1| 22| 0|
| A| 1| 52| 7|
| A| 1| 45| 0|
| B| 1| 40| 8|
| B| 1| 32| 0|
| B| 1| 45| 15|
| B| 1| 30| 0|
+---+---+---+-----+
And the final outcome can be done using a simple aggregation:
Delta.groupBy('id').sum('Delta').show()
+---+----------+
| id|sum(Delta)|
+---+----------+
| B| 23|
| A| 47|
+---+----------+
Upvotes: 1