orNehPraka
orNehPraka

Reputation: 443

Left Outer Join in pyspark and select columns which exists in left Table

I have to write a pyspark join query. My requirement is: I only have to select records which only exists in left table.

SQL solution for this is :

select Left.* 
FROM LEFT LEFT_OUTER_JOIN RIGHT
where RIGHT.column1 is NULL and Right.column2 is NULL

For me challenge is, these 2 tables are dataframe. I am creating them in run time. so I am not aware of right dataframe columns detail or their number. I have to run this "Is Null" check on every column of Right DataFrame.

I need your help to resolve this, any kind of run time feature will help.

The code I have so far -

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import sha2,udf
from pyspark.sql.types import StructType
import csv
import ConfigParser
import collections
import hashlib
import sys
import pandas as pd
import datetime
from datetime import datetime,timedelta
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row

emp = [('Ram',25,12,1),('Jalfaizy',22,13,2),('saurabh',20,14,3),('Bala',26,15,4)]
rddemp = sc.parallelize(emp)
emp1 = rddemp.map(lambda x: Row(name=x[0], dept=int(x[1]),col=x[2], ign=x[2]))
empDF = sqlContext.createDataFrame(emp1)

dept = [('Ram',25,12,1),('Jalfaizy',22,16,3),('Kukarm',50,17,4)]
rdddept = sc.parallelize(dept)
dept1 = rdddept.map(lambda x: Row(name=x[0], dept=int(x[1]), col=x[2], ign=x[2]))
deptDF = sqlContext.createDataFrame(dept1)

empDF1=empDF.drop("ign")
deptDF1=deptDF.drop("ign")

make_sha = udf(lambda row: hashlib.pbkdf2_hmac('sha512', str(row), b'salt', 100000))
src_sha = empDF1.withColumn("sha512", make_sha(struct([empDF1[x] for x in empDF1.columns])))
tgt_sha = deptDF1.withColumn("sha512", make_sha(struct([deptDF1[x] for x in deptDF1.columns])))

tblPrimaryKeyList="dept|name".split('|')

stgActiveDF = src_sha.alias('STG').join(tgt_sha.alias('TGT'), "sha512",'left_outer').where("TGT.name").isNull())\
    .select("STG.*").drop("sha512").dropDuplicates()

Problem Area, where I need help is below. I have to replace TGT.name with something like "TGT.columns":

stgActiveDF = src_sha.alias('STG').join(tgt_sha.alias('TGT'),"sha512",'left_outer').where(col("TGT.name").isNull()).select("STG.*").drop("sha512").dropDuplicates()

Thanks in Advance.

Upvotes: 2

Views: 2453

Answers (1)

user3772914
user3772914

Reputation: 41

A juvenile approach could be to piece together a query using basic string composition:

where_clause = f' and '.join([f'TGT.{col}' for col in tgt_sha.columns])
joined_df = df1.join(df2,"join_col",'left_outer').where(where_clause)

Upvotes: 1

Related Questions