Maxim Veksler
Maxim Veksler

Reputation: 30182

Feature Engineering continue previous last value occuerence

For the given dataframe

spark.createDataFrame([
  ("2019-06-24T07:29:22.000+0000", "Image Quality: 75"),
  ("2019-06-25T07:29:22.000+0000", "Start scan"),
  ("2019-06-26T07:29:22.000+0000", "Image Quality: 95"),
  ("2019-06-27T07:29:22.000+0000", "Start scan"),
  ("2019-06-28T07:29:22.000+0000", "Start scan")  
], ["ts", "message"])

I'm interested to engineer an image quality feature, i.e. compose the following dataframe.

+----------------------------+----------+-------------+
|ts                          |message   |image_quality|
+----------------------------+----------+-------------+
|2019-06-25T07:29:22.000+0000|Start scan|75           |
|2019-06-27T07:29:22.000+0000|Start scan|95           |
|2019-06-28T07:29:22.000+0000|Start scan|95           |
+----------------------------+----------+-------------+

I've attempted various combination of windows functions and subqueries but nothing seems to be reaching a workable solution.

Upvotes: 0

Views: 73

Answers (1)

SMaZ
SMaZ

Reputation: 2655

IIUC, you want to carry forward last available Image quality till next available.

Can try something like below using Window:

Assumptions from the given dataset: For any date, it will always start with Image Quality: <some value> and followed by Start scan

Imports and prepare dataset:

# Import Window
from pyspark.sql.window import Window
import pyspark.sql.functions as f

df.show(10, False)
+----------------------------+-----------------+
|ts                          |message          |
+----------------------------+-----------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75|
|2019-06-25T07:29:22.000+0000|Start scan       |
|2019-06-26T07:29:22.000+0000|Image Quality: 95|
|2019-06-27T07:29:22.000+0000|Start scan       |
|2019-06-28T07:29:22.000+0000|Start scan       |
+----------------------------+-----------------+

Now split the message with : separator and creating image_quality column

df1 = df.withColumn('image_quality', f.split('message', ':')[1])
df1.show(10, False)
+----------------------------+-----------------+-------------+
|ts                          |message          |image_quality|
+----------------------------+-----------------+-------------+
|2019-06-24T07:29:22.000+0000|Image Quality: 75| 75          |
|2019-06-25T07:29:22.000+0000|Start scan       |null         |
|2019-06-26T07:29:22.000+0000|Image Quality: 95| 95          |
|2019-06-27T07:29:22.000+0000|Start scan       |null         |
|2019-06-28T07:29:22.000+0000|Start scan       |null         |
+----------------------------+-----------------+-------------+

Define window with orderBy ts column

Note: As we are interested in solution approach so didn't add any partitionBy column but If possible then always have partitionBy

Spark will also give below warning :

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

w_spec = Window.orderBy('ts')

Final dataset prepare:

Now, Apply window and find the last available image quality using last('col', True). Here True will ignore the null values.

Also, filter the records and remove Not like Image Quality or == Start scan.

final_df = df1.withColumn('image_quality', f.coalesce('image_quality', f.last('image_quality', True).over(w_spec))) \
            .where(df1.message == 'Start scan')

final_df.show()

+--------------------+----------+-------------+
|                  ts|   message|image_quality|
+--------------------+----------+-------------+
|2019-06-25T07:29:...|Start scan|           75|
|2019-06-27T07:29:...|Start scan|           95|
|2019-06-28T07:29:...|Start scan|           95|
+--------------------+----------+-------------+

Upvotes: 1

Related Questions