justanewb
justanewb

Reputation: 133

Using spark to merge 12 large dataframes together

Say I have one dataframe that looks like this

d1 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[1,2,3],
'B':[4,5,6],
'C':[7,8,9],
}
df1 = pd.DataFrame(data=d1)

and I have another dataframe that looks like this

d2 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2010','2010','2010'],
'state':['Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County'],
'A':[11,22,33],
'B':[44,55,66],
'C':[77,88,99],
'D': [10,20,30]
}
df2 = pd.DataFrame(data=d2)

If I want to merge these two dataframes into one dataframe but stack them if the columns are the same otherwise create a new column that would give me this associated dataframe

d3 = {'Location+Type':['Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500', 'Census Tract 3, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000300', 'Census Tract 4, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000400','Census Tract 5, Jefferson County, Alabama: Summary level: 140, state:01> county:073> tract:000500'],
'Year':['2009','2009','2009','2010','2010','2010'],
'state':['Alabama','Alabama','Alabama','Alabama','Alabama','Alabama'],
'Census_tract':[3,3,3,3,3,3],
'County_name':['Jefferson County','Jefferson County','Jefferson County', 'Jefferson County','Jefferson County','Jefferson County']
'A':[1,2,3,11,22,33],
'B':[4,5,6,44,55,66],
'C':[7,8,9,77,88,99],
'D':[None,None,None,10,20,30]
}

df_final = pd.DataFrame(data=d3)

Then all I need to do using pandas is this

df = pd.concat([df1, df2]).reset_index(drop=True)

Now the problem is that when I launched a very large ec2 instance it was taking days to run and the ec2 instance I had was charging me 10/hr. I have AWS developer support and was recommended to "use EMR cluster over Glue service which gives you more control over Spark cluster size and configuration."

I have zero experience using EMR so I wanted to know what the equivalent expression would be to perform the task that I described using pandas.

Specifically, what is the equivalent expression of df = pd.concat([df1, df2]).reset_index(drop=True) in EMR using Spark?

Would it be something like this "union()" or unionAll()"? I would like to know the specific expression with the same dataframes I have for my understanding if possible.

Upvotes: 0

Views: 147

Answers (1)

Corralien
Corralien

Reputation: 120549

You can use unionByName

# Your own initialization here
df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)

# Use unionByName
df_final = df1.unionByName(df2, allowMissingColumns=True)

Output:

>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
|       Location+Type|Year|  state|Census_tract|     County_name|  A|  B|  C|   D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama|           3|Jefferson County|  1|  4|  7|null|
|Census Tract 4, J...|2009|Alabama|           3|Jefferson County|  2|  5|  8|null|
|Census Tract 5, J...|2009|Alabama|           3|Jefferson County|  3|  6|  9|null|
|Census Tract 3, J...|2010|Alabama|           3|Jefferson County| 11| 44| 77|  10|
|Census Tract 4, J...|2010|Alabama|           3|Jefferson County| 22| 55| 88|  20|
|Census Tract 5, J...|2010|Alabama|           3|Jefferson County| 33| 66| 99|  30|
+--------------------+----+-------+------------+----------------+---+---+---+----+

Update

Assuming there are 12 dataframes, how would I adjust the code?

from pyspark.sql import DataFrame
from functools import reduce, partial

df1 = spark.createDataFrame(df1)
df2 = spark.createDataFrame(df2)
df3 = spark.createDataFrame(df3)
df4 = spark.createDataFrame(df4)

dfs = [df1, df2, df3, df4]
unionByName = partial(DataFrame.unionByName, allowMissingColumns=True)
df_final = reduce(unionByName, dfs)

Output:

>>> df_final.show()
+--------------------+----+-------+------------+----------------+---+---+---+----+
|       Location+Type|Year|  state|Census_tract|     County_name|  A|  B|  C|   D|
+--------------------+----+-------+------------+----------------+---+---+---+----+
|Census Tract 3, J...|2009|Alabama|           3|Jefferson County|  1|  4|  7|null|
|Census Tract 4, J...|2009|Alabama|           3|Jefferson County|  2|  5|  8|null|
|Census Tract 5, J...|2009|Alabama|           3|Jefferson County|  3|  6|  9|null|
|Census Tract 3, J...|2010|Alabama|           3|Jefferson County| 11| 44| 77|  10|
|Census Tract 4, J...|2010|Alabama|           3|Jefferson County| 22| 55| 88|  20|
|Census Tract 5, J...|2010|Alabama|           3|Jefferson County| 33| 66| 99|  30|
|Census Tract 3, J...|2009|Alabama|           3|Jefferson County| 21| 24| 27|null|
|Census Tract 4, J...|2009|Alabama|           3|Jefferson County| 22| 25| 28|null|
|Census Tract 5, J...|2009|Alabama|           3|Jefferson County| 23| 26| 29|null|
|Census Tract 3, J...|2010|Alabama|           3|Jefferson County|111|144|177| 110|
|Census Tract 4, J...|2010|Alabama|           3|Jefferson County|122|155|188| 120|
|Census Tract 5, J...|2010|Alabama|           3|Jefferson County|133|166|199| 130|
+--------------------+----+-------+------------+----------------+---+---+---+----+

Upvotes: 1

Related Questions