Reputation: 407
I have a table like this:
EventID | EventTime | AttrA | AttrB |
---|---|---|---|
1 | 2022-10-01 00:00:01.000000 | null | null |
1 | 2022-10-01 00:00:02.000000 | a | null |
1 | 2022-10-01 00:00:03.000000 | b | 1 |
1 | 2022-10-01 00:00:04.000000 | null | null |
2 | 2022-10-01 00:01:01.000000 | aa | 11 |
2 | 2022-10-01 00:01:02.000000 | bb | null |
2 | 2022-10-01 00:01:03.000000 | null | null |
2 | 2022-10-01 00:01:04.000000 | aa | 22 |
and I want to jump across the records to return the first and last not null AttrA and AttrB values for each eventID based on the eventTime. Each eventID can have multiple records so we can't know where the not nulls may be. So the wished results would be:
EventID | FirstAttrA | LastAttrA | FirstAttrB | LastAttrB |
---|---|---|---|---|
1 | a | b | 1 | 1 |
2 | aa | aa | 11 | 22 |
What I did is to add row_number() OVER (PARTITION BY event_id) ORDER BY event_time ASC)
and then again DESC
and then have multiple CTEs
like this:
WITH enhanced_table AS
(
SELECT
eventID,
attrA,
attrB,
row_number() OVER (PARTITION BY event_id) ORDER BY event_time ASC) as rn,
row_number() OVER (PARTITION BY event_id) ORDER BY event_time DESC) as reversed_rn
),
first_events_with_attrA AS
(
SELECT
eventID,
FIRST(attrA) OVER (PARTITION BY eventID ORDER BY rn ASC) AS url
FROM enhanced_table
WHERE attrA IS NOT NULL
)...
But I need one CTE which scans again the table for each case I want (for this example 4 CTEs in total). It works, but it is slow.
Is there a way to grab the values I am interested in in a more efficient way?
Upvotes: 0
Views: 41
Reputation: 5032
No Need to build Row Numbers , you can directly use native SparkSQL
Functions FIRST & LAST with isIgnoreNull
as True to achieve the intended results -
s = StringIO("""
EventID,EventTime,AttrA,AttrB
1,2022-10-01 00:00:01.000000,,
1,2022-10-01 00:00:02.000000,a,
1,2022-10-01 00:00:03.000000,b,1
1,2022-10-01 00:00:04.000000,,
2,2022-10-01 00:01:01.000000,aa,11
2,2022-10-01 00:01:02.000000,bb,
2,2022-10-01 00:01:03.000000,,
2,2022-10-01 00:01:04.000000,aa,22
"""
)
inp_schema = StructType([
StructField('EventID',IntegerType(),True)
,StructField('EventTime',StringType(),True)
,StructField('AttrA',StringType(),True)
,StructField('AttrB',DoubleType(),True)
]
)
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df,schema=inp_schema)\
.withColumn('AttrA',F.when(F.isnan(F.col('AttrA')),None).otherwise(F.col('AttrA')))\
.withColumn('AttrB',F.when(F.isnan(F.col('AttrB')),None).otherwise(F.col('AttrB')))
sparkDF.show(truncate=False)
+-------+--------------------------+-----+-----+
|EventID|EventTime |AttrA|AttrB|
+-------+--------------------------+-----+-----+
|1 |2022-10-01 00:00:01.000000|null |null |
|1 |2022-10-01 00:00:02.000000|a |null |
|1 |2022-10-01 00:00:03.000000|b |1.0 |
|1 |2022-10-01 00:00:04.000000|null |null |
|2 |2022-10-01 00:01:01.000000|aa |11.0 |
|2 |2022-10-01 00:01:02.000000|bb |null |
|2 |2022-10-01 00:01:03.000000|null |null |
|2 |2022-10-01 00:01:04.000000|aa |22.0 |
+-------+--------------------------+-----+-----+
sparkDF.registerTempTable("INPUT")
sql.sql("""
SELECT
EventID,
FIRST(AttrA,True) as First_AttrA,
LAST(AttrA,True) as Last_AttrA,
FIRST(AttrB,True) as First_AttrB,
LAST(AttrB,True) as Last_AttrB
FROM INPUT
GROUP BY 1
""").show()
+-------+-----------+----------+-----------+----------+
|EventID|First_AttrA|Last_AttrA|First_AttrB|Last_AttrB|
+-------+-----------+----------+-----------+----------+
| 1| a| b| 1.0| 1.0|
| 2| aa| aa| 11.0| 22.0|
+-------+-----------+----------+-----------+----------+
Upvotes: 1