Nabih Bawazir
Nabih Bawazir

Reputation: 7265

merging filter multiple condition on pyspark

Here's my dataset sparkDF1

Id   Value  month   Year
1    672        4   2020  
1    356        6   2020
2    683        6   2019  
3    366        4   2021

Here's my dataset sparkDF2

Id   Value  month   Year
1    671        4   2020  
1    353        6   2020
2    682        6   2019  
3    363        4   2021

Here's my expected dataset sparkDF that is used sparkDF2 from month=5 Year=2020 and before that using sparkDF1

Id   Value  month   Year
1    672        4   2020  
1    353        6   2020
2    683        6   2019  
3    363        4   2021

The pandas alternative is

df = df1.mask((df1['month'].ge(5) & df1['Year'].eq(2020)) | df1['Year'].ge(2021), df2)

Upvotes: 0

Views: 260

Answers (2)

wwnde
wwnde

Reputation: 26676

Option 1: Filter and unionBy

s=((df1.month >= 5)&(df1.Year == 2020))|(df1.Year >= 2021)

s1=((df2.month >= 5)&(df2.Year == 2020))|(df2.Year >= 2021)

new = df1.where(~s).unionByName(df2.where(s1)).orderBy('Id')

new.show()

+---+-----+-----+----+
| Id|Value|month|Year|
+---+-----+-----+----+
|  1|  672|    4|2020|
|  1|  353|    6|2020|
|  2|  683|    6|2019|
|  3|  363|    4|2021|
+---+-----+-----+----+

Option 2: If you have pandas code, you can use pandas udfs. The problem with pandas udf that include two dataframes use the cogroup method which incurs a shuffle. In your case. I would use pandas' combine_first or just what you did. code below

import pandas as pd

def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
  l =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021))
  r=r
  return l.combine_first(r)

df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show() 

or

import pandas as pd

def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
  t =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021),r)
  
  return t

df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show() 
+---+-----+-----+----+
| Id|Value|month|Year|
+---+-----+-----+----+
|  1|  672|    4|2020|
|  3|  363|    4|2021|
|  2|  683|    6|2019|
|  1|  353|    6|2020|
+---+-----+-----+----+

Upvotes: 1

Arnon Rotem-Gal-Oz
Arnon Rotem-Gal-Oz

Reputation: 25919

you can filter each df on its own and then union the two. Alternatively you can use Koalas which will give you pandas syntax on spark

Upvotes: 1

Related Questions