Reputation: 79
So i have two data frames which i want to join. The catch is the second table has comma separted values stored in it out of which one matches with the column in Table A. How do I it in Pyspark. Below is an example
Table A has
+-------+--------------------+
|deal_id| deal_name|
+-------+--------------------+
| 613760|ABCDEFGHI |
| 613740|TEST123 |
| 598946|OMG |
Table B has
+-------+---------------------------+--------------------+
| deal_id| deal_type|
+-------+---------------------------+--------------------+
| 613760,613761,613762,613763 |Direct De |
| 613740,613750,613770,613780,613790|Direct |
| 598946 |In |
Expected Result - Join table A and Table B when there is a match with Table A's deal ID against Table B's comma separted value. For instance TableA.dealid - 613760 is in table B's 1 st row, i want that row returned.
+-------+--------------------+---------------+
|deal_id| deal_name| deal_type|
+-------+--------------------+---------------+
| 613760|ABCDEFGHI |Direct De |
| 613740|TEST123 |Direct |
| 598946|OMG |In |
Any assistance is appreciated. I need it in pyspark.
Thanks.
Upvotes: 0
Views: 1135
Reputation: 2647
Sample data
from pyspark.sql.types import IntegerType, LongType, StringType, StructField, StructType
tuples_a = [('613760', 'ABCDEFGHI'),
('613740', 'TEST123'),
('598946', 'OMG'),
]
schema_a = StructType([
StructField('deal_id', StringType(), nullable=False),
StructField('deal_name', StringType(), nullable=False)
])
tuples_b = [('613760,613761,613762,613763 ', 'Direct De'),
('613740,613750,613770,613780,613790', 'Direct'),
('598946', 'In'),
]
schema_b = StructType([
StructField('deal_id', StringType(), nullable=False),
StructField('deal_type', StringType(), nullable=False)
])
df_a = spark_session.createDataFrame(data=tuples_a, schema=schema_a)
df_b = spark_session.createDataFrame(data=tuples_b, schema=schema_b)
You need to split the column and explode it in order to join.
from pyspark.sql.functions import split, col, explode
df_b = df_b.withColumn('split', split(col('deal_id'), ','))\
.withColumn('exploded', explode(col('split')))\
.drop('deal_id', 'split')\
.withColumnRenamed('exploded', 'deal_id')
df_a.join(df_b, on = 'deal_id', how = 'left_outer')\
.show(10, False)
and the expected result
+-------+---------+---------+
|deal_id|deal_name|deal_type|
+-------+---------+---------+
|613760 |ABCDEFGHI|Direct De|
|613740 |TEST123 |Direct |
|598946 |OMG |In |
+-------+---------+---------+
Upvotes: 1