Reputation: 123
I have a problem in pyspark when joining two dataframes. The first dataframe is a one single column dataframe "zipcd", and the second one is a dataframe with four columns.
The problem arises whenever I try to join the two dataframes because Pyspark returns me in my new dataframe, regarding the one single column of zipcd, a column that all its value are the same (the first row is duplicated in all rows, and it is not like this).
For instance:
Zip.select("Zip").show()
+------------+
| Zip|
+------------+
| 6.0651002E8|
| 6.0623002E8|
| 6.0077203E8|
| 6.0626528E8|
| 6.0077338E8|
| 0.0|
and the other dataframe is zipcd:
zip_cd1.show()
+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
| 0|
Whenever I try to join the dataframes, it always happens the following:
Zip1=zip_cd1.join(Zip).select('Zip','zipcd')
Zip1.show()
+------------+-----+
| Zip|zipcd|
+------------+-----+
| 6.0651002E8|60651|
| 6.0623002E8|60651|
| 6.0077203E8|60651|
| 6.0626528E8|60651|
| 6.0077338E8|60651|
| 0.0|60651|
It happens no matter if I change the type of join, and I don't have any idea of what's happening.
Expected output:
+------------+-----+
| Zip|zipcd|
+------------+-----+
| 6.0651002E8|60651|
| 6.0623002E8|60623|
| 6.0077203E8|60077|
| 6.0626528E8|60626|
| 6.0077338E8|60077|
| 0.0|0 |
Upvotes: 1
Views: 487
Reputation: 527
I understood the data in a way that zip code 60651 is stored as 6.0651002E8 , With that assumption I have given the solution below.
>>> df = spark.read.text('test.dat')
>>> df.show()
+-----------+
| value|
+-----------+
|6.0651002E8|
|6.0623002E8|
|6.0077203E8|
|6.0626528E8|
|6.0077338E8|
| 0.0|
+-----------+
df.createOrReplaceTempView("Tempview")
df = spark.sql("select substring(cast(cast(value as decimal) as string),1,5) zipcd,value as zip from TempView")
+-----+-----------+
|zipcd| zip|
+-----+-----------+
|60651|6.0651002E8|
|60623|6.0623002E8|
|60077|6.0077203E8|
|60626|6.0626528E8|
|60077|6.0077338E8|
| 0| 0.0|
+-----+-----------+
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
data = [
("60651","State1","County1"),
("60623","State2","County2"),
("60077","State2","County3"),
("60626","State2","County4"),
("60077","State2","County5"),
("0","Unkown","Unkown")
]
schema = StructType([
StructField('zip', StringType(),True),\
StructField('state', StringType(),True),\
StructField('county', StringType(),True),\
])
df1 = spark.createDataFrame(data=data, schema=schema)
df1.show()
df.createOrReplaceTempView("Data")
df1.createOrReplaceTempView("Data1")
>>> spark.sql("select * from Data1 A inner join Data B on A.zip=b.zipcd").show()
+-----+------+-------+-----+-----------+
| zip| state| county|zipcd| zip|
+-----+------+-------+-----+-----------+
|60651|State1|County1|60651|6.0651002E8|
|60623|State2|County2|60623|6.0623002E8|
|60077|State2|County3|60077|6.0077338E8|
|60077|State2|County3|60077|6.0077203E8|
|60626|State2|County4|60626|6.0626528E8|
|60077|State2|County5|60077|6.0077338E8|
|60077|State2|County5|60077|6.0077203E8|
| 0|Unkown| Unkown| 0| 0.0|
+-----+------+-------+-----+-----------+
Upvotes: 1
Reputation: 14845
If both dataframes have the same number of partitions and the same number of rows you can use RDD.zip and then re-create a dataframe from the result:
zipped_rdd = zip.rdd.zip(zipcd.rdd).map(lambda x: (x[0]['Zip'], x[1]['zipcd']))
df = spark.createDataFrame(zipped_rdd, schema=['Zip', 'zipcd'])
If the dataframes have different numbers of partitions or rows, RDD.zipWithIndex and then a full outer join can be used instead if the zip
:
zipped_rdd = zip.rdd.zipWithIndex().map(lambda x: (x[1], x[0])).fullOuterJoin(
zipcd.rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
).map(lambda x: (x[1][0]['Zip'] if x[1][0] != None else None, x[1][1]['zipcd'] if x[1][1] != None else None))
df = spark.createDataFrame(zipped_rdd, schema=['Zip', 'zipcd'])
Result:
+-----------+-----+
| Zip|zipcd|
+-----------+-----+
|6.0651002E8|60651|
|6.0623002E8|60623|
|6.0077203E8|60077|
|6.0626528E8|60626|
|6.0077338E8|60077|
| 0.0| 0|
+-----------+-----+
Upvotes: 1