dumdum
dumdum

Reputation: 87

Pyspark conditionally replace value in column with value from another column

I am working with some weather data that is missing some values (indicated via value code). For example, if SLP data is missing, it is assigned code 99999. I was able to use a window function to calculate a 7 day average and save it as a new column. A significantly reduced example of a single row is shown below:

SLP_ORIGIN SLP_ORIGIN_7DAY_AVG
99999 11945.823516044207

I'm trying to write code such that when SLP_ORIGIN has the missing code it gets replaced using the SLP_ORIGIN_7DAY_AVG value. However, most code explains how to replace a column value based on a conditional with a constant value, not the column value. I tried using the following:

train_impute = train.withColumn("SLP_ORIGIN", \
              when(train["SLP_ORIGIN"] == 99999, train["SLP_ORIGIN_7DAY_AVG"]).otherwise(train["SLP_ORIGIN"]))

where the dataframe is called train.

When I perform a count on the SLP_ORIGIN column using train.where("SLP_ORIGIN = 99999").count() I get the same count from before I attempted replacing the value in that column. I have already checked and my SLP_ORIGIN_7DAY_AVG does not have any values that match the missing code.

So how do I actually replace the 99999 values in the SLP_ORIGIN column with the associated SLP_ORIGIN_7DAY_AVG value?

EVEN BETTER, is there a way to do this replacement and window calculation without making a 7 day average column (I have other variables I need to do the same thing with so I'm hoping there is a more efficient way to do this).

Upvotes: 0

Views: 1637

Answers (2)

Daniel Moraite
Daniel Moraite

Reputation: 516

or it could work as following:

without the need to average or anything else: just replacing values of one column with another if condition.

data = data.withColumn(
    "replaced",
    when(data["rain_values"] < data["rain_median_values"], data["rain_median_values"]).otherwise(
        data["rain_values"]
    ),
)

Upvotes: 0

dumdum
dumdum

Reputation: 87

Make sure to double check with dataframe you are verifying on.

I was using train.where("SLP_ORIGIN = 99999").count() when I should have been using train_impute.where("SLP_ORIGIN = 99999").count()

Additionally, instead of making a whole new column to store the imputed 7 day average, one can only calculate the average when the missing value code is present:

train = train.withColumn("SLP_ORIGIN", when(train["SLP_ORIGIN"] == 99999, f.avg('SLP_ORIGIN').over(w)).otherwise(train["SLP_ORIGIN"]))\

Upvotes: 2

Related Questions