Reputation: 30182
For the given dataframe
spark.createDataFrame([
("2019-06-24T07:29:22.000+0000", "Image Quality: 75"),
("2019-06-25T07:29:22.000+0000", "Start scan"),
("2019-06-26T07:29:22.000+0000", "Image Quality: 95"),
("2019-06-27T07:29:22.000+0000", "Start scan"),
("2019-06-28T07:29:22.000+0000", "Start scan")
], ["ts", "message"])
I'm interested to engineer an image quality feature, i.e. compose the following dataframe.
+----------------------------+----------+-------------+
|ts |message |image_quality|
+----------------------------+----------+-------------+
|2019-06-25T07:29:22.000+0000|Start scan|75 |
|2019-06-27T07:29:22.000+0000|Start scan|95 |
|2019-06-28T07:29:22.000+0000|Start scan|95 |
+----------------------------+----------+-------------+
I've attempted various combination of windows functions and subqueries but nothing seems to be reaching a workable solution.
Upvotes: 0
Views: 73
Reputation: 2655
IIUC, you want to carry forward last available Image quality till next available.
Can try something like below using Window
:
Assumptions from the given dataset: For any date, it will always start with Image Quality: <some value>
and followed by Start scan
Imports and prepare dataset:
# Import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as f
df.show(10, False)
+----------------------------+-----------------+
|ts |message |
+----------------------------+-----------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75|
|2019-06-25T07:29:22.000+0000|Start scan |
|2019-06-26T07:29:22.000+0000|Image Quality: 95|
|2019-06-27T07:29:22.000+0000|Start scan |
|2019-06-28T07:29:22.000+0000|Start scan |
+----------------------------+-----------------+
Now split the message
with :
separator and creating image_quality
column
df1 = df.withColumn('image_quality', f.split('message', ':')[1])
df1.show(10, False)
+----------------------------+-----------------+-------------+
|ts |message |image_quality|
+----------------------------+-----------------+-------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75| 75 |
|2019-06-25T07:29:22.000+0000|Start scan |null |
|2019-06-26T07:29:22.000+0000|Image Quality: 95| 95 |
|2019-06-27T07:29:22.000+0000|Start scan |null |
|2019-06-28T07:29:22.000+0000|Start scan |null |
+----------------------------+-----------------+-------------+
Define window with orderBy
ts
column
Note: As we are interested in solution approach so didn't add any partitionBy
column but If possible then always have partitionBy
Spark will also give below warning :
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
w_spec = Window.orderBy('ts')
Final dataset prepare:
Now, Apply window
and find the last available image quality using last('col', True)
. Here True
will ignore the null
values.
Also, filter the records and remove Not like Image Quality
or == Start scan
.
final_df = df1.withColumn('image_quality', f.coalesce('image_quality', f.last('image_quality', True).over(w_spec))) \
.where(df1.message == 'Start scan')
final_df.show()
+--------------------+----------+-------------+
| ts| message|image_quality|
+--------------------+----------+-------------+
|2019-06-25T07:29:...|Start scan| 75|
|2019-06-27T07:29:...|Start scan| 95|
|2019-06-28T07:29:...|Start scan| 95|
+--------------------+----------+-------------+
Upvotes: 1