Reputation: 13
In pyspark, I'm trying to replace multiple text values in a column by the value that are present in the columns which names are present in the calc column (formula).
So to be clear, here is an example :
Input:
|param_1|param_2|calc
|-------|-------|--------
|Cell 1 |Cell 2 |param_1-param_2
|Cell 3 |Cell 4 |param_2/param_1
Output needed:
|param_1|param_2|calc
|-------|-------|--------
|Cell 1 |Cell 2 |Cell 1-Cell 2
|Cell 3 |Cell 4 |Cell 4/Cell 3
In the column calc, the default value is a formula. It can be something as much as simple as the ones provided above or it can be something like "2*(param_8-param_4)/param_2-(param_3/param_7)". What I'm looking for is something to substitute all the param_x by the values in the related columns regarding the names.
I've tried a lot of things but nothing works at all and most of the time when I use replace or regex_replace with a column for the replacement value, the error the column is not iterable occurs.
Moreover, the columns param_1, param_2, ..., param_x are generated dynamically and the calc column values can some of these columns but not necessary all of them.
Could you help me on the subject with a dynamic solution ?
Thank you so much. Best regards
Upvotes: 1
Views: 93
Reputation: 3348
Update: Turned out I misunderstood the requirement. This would work:
for exp in ["regexp_replace(calc, '"+col+"', "+col+")" for col in df.schema.names]:
df=df.withColumn("calc", F.expr(exp))
Yet Another Update: To Handle Null Values add coalesce:
for exp in ["coalesce(regexp_replace(calc, '"+col+"', "+col+"), calc)" for col in df.schema.names]:
df=df.withColumn("calc", F.expr(exp))
------- Keeping the below section for a while just for reference -------
You can't directly do that - as you won't be able to use column value directly unless you collect in a python object (which is obviously not recommended).
This would work with the same:
df = spark.createDataFrame([["1","2", "param_1 - param_2"],["3","4", "2*param_1 + param_2"]]).toDF("param_1", "param_2", "calc");
df.show()
df=df.withColumn("row_num", F.row_number().over(Window.orderBy(F.lit("dummy"))))
as_dict = {row.asDict()["row_num"]:row.asDict()["calc"] for row in df.select("row_num", "calc").collect()}
expression = f"""CASE {' '.join([f"WHEN row_num ='{k}' THEN ({v})" for k,v in as_dict.items()])} \
ELSE NULL END""";
df.withColumn("Result", F.expr(expression)).show();
Input/Output:
Upvotes: 1