Viv
Viv

Reputation: 1584

How to set column values from different table in pyspark?

In Pyspark - How to Set column values of a column(listed_1) in Table A with values from Table B (list_date) on a where condition (B.list_expire_value) > 5 || (B.list_date) < 6. The (B.) is to show that they are Table B's columns.

Currently I am doing:

  spark_df = table_1.join("table_2", on ="uuid").when((table_2['list_expire_value'] > 5) | (table_2['list_date'] < 6)).withColumn("listed_1", table_2['list_date'])

But I am getting an error. How to do this?

Sample table : 

Table A
uuid   listed_1
001    abc
002    def
003    ghi

Table B
uuid    list_date    list_expire_value     col4
001     12           7                     dckvfd
002     14           3                     dfdfgi
003     3            8                     sdfgds

Expected Output
uuid    listed1      list_expire_value     col4
001     12           7                     dckvfd
002     def          3                     dfdfgi
003     3            8                     sdfgds

002 of listed1 will not be replaced since they do not fufil the when conditions.

Upvotes: 1

Views: 3905

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

Correct form of pyspark sql query is

from pyspark.sql import functions as F
spark_df = table_1.join(table_2, 'uuid', 'inner').withColumn('list_expire_value',F.when((table_2.list_expire_value > 5) | (table_2.list_date < 6), table_1.listed_1).otherwise(table_2.list_date)).drop(table_1.listed_1)

Upvotes: 1

Prem
Prem

Reputation: 11985

Hope this helps!

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

A = sc.parallelize([('001','abc'),('002','def'),('003','ghi')]).toDF(['uuid','listed_1'])
B = sc.parallelize([('001',12,7,'dckvfd'),('002',14,3,'dfdfgi'),('003',3,8,'sdfgds')]).\
    toDF(['uuid','list_date','list_expire_value','col4'])

def cond_fn(x, y, z):
    if (x > 5 or y < 6):
        return y
    else:
        return z

final_df = A.join(B, on="uuid")
udf_val = udf(cond_fn, StringType())
final_df = final_df.withColumn("listed1",udf_val(final_df.list_expire_value,final_df.list_date, final_df.listed_1))
final_df.select(["uuid","listed1","list_expire_value","col4"]).show()


Don't forget to let us know if it solved your problem :)

Upvotes: 1

Related Questions