Cazau
Cazau

Reputation: 13

How to replace text in column by the value contained in the columns named in this text

In pyspark, I'm trying to replace multiple text values in a column by the value that are present in the columns which names are present in the calc column (formula).

So to be clear, here is an example :

Input:

|param_1|param_2|calc 
|-------|-------|--------
|Cell 1 |Cell 2 |param_1-param_2
|Cell 3 |Cell 4 |param_2/param_1

Output needed:

|param_1|param_2|calc 
|-------|-------|--------
|Cell 1 |Cell 2 |Cell 1-Cell 2
|Cell 3 |Cell 4 |Cell 4/Cell 3

In the column calc, the default value is a formula. It can be something as much as simple as the ones provided above or it can be something like "2*(param_8-param_4)/param_2-(param_3/param_7)". What I'm looking for is something to substitute all the param_x by the values in the related columns regarding the names.

I've tried a lot of things but nothing works at all and most of the time when I use replace or regex_replace with a column for the replacement value, the error the column is not iterable occurs.

Moreover, the columns param_1, param_2, ..., param_x are generated dynamically and the calc column values can some of these columns but not necessary all of them.

Could you help me on the subject with a dynamic solution ?

Thank you so much. Best regards

Upvotes: 1

Views: 93

Answers (1)

Ronak Jain
Ronak Jain

Reputation: 3348

Update: Turned out I misunderstood the requirement. This would work:

for exp in ["regexp_replace(calc, '"+col+"', "+col+")" for col in df.schema.names]:
   df=df.withColumn("calc", F.expr(exp))

Yet Another Update: To Handle Null Values add coalesce:

for exp in ["coalesce(regexp_replace(calc, '"+col+"', "+col+"), calc)" for col in df.schema.names]:
   df=df.withColumn("calc", F.expr(exp))

Input/Output: In/Out

------- Keeping the below section for a while just for reference -------

You can't directly do that - as you won't be able to use column value directly unless you collect in a python object (which is obviously not recommended).

This would work with the same:

    df = spark.createDataFrame([["1","2", "param_1 - param_2"],["3","4", "2*param_1 + param_2"]]).toDF("param_1", "param_2", "calc");

    df.show()

    df=df.withColumn("row_num", F.row_number().over(Window.orderBy(F.lit("dummy"))))

    as_dict = {row.asDict()["row_num"]:row.asDict()["calc"] for row in df.select("row_num", "calc").collect()}

    expression = f"""CASE {' '.join([f"WHEN row_num ='{k}' THEN ({v})" for k,v in as_dict.items()])} \
            ELSE NULL END""";

    df.withColumn("Result", F.expr(expression)).show();

Input/Output:

Output

Upvotes: 1

Related Questions