Reputation: 355
I'm using Pyspark 2.1.0.
I'm attempting to perform a left outer join of two dataframes using the following: I have 2 dataframes, schema of which appear as follows:
crimes
|-- CRIME_ID: string (nullable = true)
|-- YEAR_MTH: string (nullable = true)
|-- CRIME_TYPE: string (nullable = true)
|-- CURRENT_OUTCOME: string (nullable = true)
outcomes
|-- CRIME_ID: string (nullable = true)
|-- YEAR_MTH: string (nullable = true)
|-- FINAL_OUTCOME: string (nullable = true)
I need to be able to join crimes to outcomes based on a left outer since many outcomes exist for a single crime. I would like to exclude columns that are common to both dataframes.
I have tried the following 2 ways, but each generate various errors:
cr_outs = crimes.join(outcomes, crimes.CRIME_ID == outcomes.CRIME_ID, 'left_outer')\
.select(['crimes.'+c for c in crimes.columns] + ['outcomes.FINAL_OUTCOME'])
from pyspark.sql.functions as fn
cr_outs = crimes.alias('a').join(outcomes.alias('b'), fn.col('b.CRIME_ID') = fn.col('a.CRIME_ID') ,'left_outer')\
.select([fn.col('a.'+ c) for c in a.columns] + b.FINAL_OUTCOME)
could anybody suggest an alternative way? thanks
Upvotes: 3
Views: 13569
Reputation: 125
def dropDupeDfCols(df):
newcols = []
dupcols = []
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)## Heading ##
Upvotes: 0
Reputation: 355
This did the trick, seems you have to use an alias, similar what has been posted before, slightly simpler though in PySpark 2.1.0.
cr_outs = crimes.alias('a')\
.join(outcomes, crimes.CRIME_ID == outcomes.CRIME_ID, 'left_outer')\
.select(*[col('a.'+c) for c in crimes.columns]
+ [outcomes.FINAL_OUTCOME])
cr_outs.show()
cr_outs.printSchema()
--------+-------------------+--------------------+--------------------+--------------------+
| CRIME_ID|YEAR_MTH| REPORTED_BY| FALLS_WITHIN|LONGITUDE| LATITUDE| LOCATION|LSOA_CODE| LSOA_NAME| CRIME_TYPE| CURRENT_OUTCOME| FINAL_OUTCOME|
+--------------------+--------+--------------------+--------------------+---------+---------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+
|426085c2ed33af598...| 2017-01|City of London Po...|City of London Po...|-0.086051| 51.51357|On or near Finch ...|E01032739|City of London 001F| Other theft|Investigation com...|Investigation com...|
|33a3ddb8160a854a4...| 2017-01|City of London Po...|City of London Po...|-0.077777|51.518047|On or near Sandy'...|E01032
..
..
..
root
|-- CRIME_ID: string (nullable = true)
|-- YEAR_MTH: string (nullable = true)
|-- REPORTED_BY: string (nullable = true)
|-- FALLS_WITHIN: string (nullable = true)
|-- LONGITUDE: float (nullable = true)
|-- LATITUDE: float (nullable = true)
|-- LOCATION: string (nullable = true)
|-- LSOA_CODE: string (nullable = true)
|-- LSOA_NAME: string (nullable = true)
|-- CRIME_TYPE: string (nullable = true)
|-- CURRENT_OUTCOME: string (nullable = true)
|-- FINAL_OUTCOME: string (nullable = true)
As you can see, there are many more columns than my original post, but no duplicate columns and no renaming of columns either :-)
Upvotes: 2
Reputation: 3619
You can rename temporarily the common columns to remove ambiguity
crimes = crimes\
.withColumnRenamed('CRIME_ID','CRIME_ID_1')\
.withColumnRenamed('YEAR_MTH','YEAR_MTH_1)
required_columns = [c for c in crimes.columns] + ['FINAL_OUTCOME']
cr_outs = crimes\
.join(outcomes, crimes.CRIME_ID_1 == outcomes.CRIME_ID, 'left_outer')\
.select(required_columns)
Upvotes: 0