Arnoldo Oliva
Arnoldo Oliva

Reputation: 123

Pyspark joining of two dataframes results with error of duplicated values

I have a problem in pyspark when joining two dataframes. The first dataframe is a one single column dataframe "zipcd", and the second one is a dataframe with four columns.

The problem arises whenever I try to join the two dataframes because Pyspark returns me in my new dataframe, regarding the one single column of zipcd, a column that all its value are the same (the first row is duplicated in all rows, and it is not like this).

For instance:

Zip.select("Zip").show()
+------------+
|         Zip|
+------------+
| 6.0651002E8|
| 6.0623002E8|
| 6.0077203E8|
| 6.0626528E8|
| 6.0077338E8|
|         0.0|

and the other dataframe is zipcd:

zip_cd1.show()
+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
|    0|

Whenever I try to join the dataframes, it always happens the following:

Zip1=zip_cd1.join(Zip).select('Zip','zipcd')
Zip1.show()
+------------+-----+
|         Zip|zipcd|
+------------+-----+
| 6.0651002E8|60651|
| 6.0623002E8|60651|
| 6.0077203E8|60651|
| 6.0626528E8|60651|
| 6.0077338E8|60651|
|         0.0|60651|

It happens no matter if I change the type of join, and I don't have any idea of what's happening.

Expected output:

+------------+-----+
|         Zip|zipcd|
+------------+-----+
| 6.0651002E8|60651|
| 6.0623002E8|60623|
| 6.0077203E8|60077|
| 6.0626528E8|60626|
| 6.0077338E8|60077|
|         0.0|0    |

Upvotes: 1

Views: 487

Answers (2)

Rafa
Rafa

Reputation: 527

I understood the data in a way that zip code 60651 is stored as 6.0651002E8 , With that assumption I have given the solution below.

    >>> df = spark.read.text('test.dat')
    >>> df.show()
    +-----------+
    |      value|
    +-----------+
    |6.0651002E8|
    |6.0623002E8|
    |6.0077203E8|
    |6.0626528E8|
    |6.0077338E8|
    |        0.0|
    +-----------+
    df.createOrReplaceTempView("Tempview")
    df = spark.sql("select substring(cast(cast(value as decimal) as string),1,5) zipcd,value as zip from TempView")
    +-----+-----------+
    |zipcd|        zip|
    +-----+-----------+
    |60651|6.0651002E8|
    |60623|6.0623002E8|
    |60077|6.0077203E8|
    |60626|6.0626528E8|
    |60077|6.0077338E8|
    |    0|        0.0|
    +-----+-----------+
    import pyspark.sql.functions as f
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    data = [
    ("60651","State1","County1"),
    ("60623","State2","County2"),
    ("60077","State2","County3"),
    ("60626","State2","County4"),
    ("60077","State2","County5"),
    ("0","Unkown","Unkown")
            ]
    
    schema = StructType([
    StructField('zip', StringType(),True),\
    StructField('state', StringType(),True),\
    StructField('county', StringType(),True),\
    ])
    df1 = spark.createDataFrame(data=data, schema=schema)
    df1.show()
    df.createOrReplaceTempView("Data")
    df1.createOrReplaceTempView("Data1")
>>> spark.sql("select * from Data1 A inner join Data B on A.zip=b.zipcd").show()
+-----+------+-------+-----+-----------+
|  zip| state| county|zipcd|        zip|
+-----+------+-------+-----+-----------+
|60651|State1|County1|60651|6.0651002E8|
|60623|State2|County2|60623|6.0623002E8|
|60077|State2|County3|60077|6.0077338E8|
|60077|State2|County3|60077|6.0077203E8|
|60626|State2|County4|60626|6.0626528E8|
|60077|State2|County5|60077|6.0077338E8|
|60077|State2|County5|60077|6.0077203E8|
|    0|Unkown| Unkown|    0|        0.0|
+-----+------+-------+-----+-----------+

Upvotes: 1

werner
werner

Reputation: 14845

If both dataframes have the same number of partitions and the same number of rows you can use RDD.zip and then re-create a dataframe from the result:

zipped_rdd = zip.rdd.zip(zipcd.rdd).map(lambda x: (x[0]['Zip'], x[1]['zipcd']))
df = spark.createDataFrame(zipped_rdd, schema=['Zip', 'zipcd'])

If the dataframes have different numbers of partitions or rows, RDD.zipWithIndex and then a full outer join can be used instead if the zip:

zipped_rdd = zip.rdd.zipWithIndex().map(lambda x: (x[1], x[0])).fullOuterJoin(
  zipcd.rdd.zipWithIndex().map(lambda x: (x[1], x[0])) 
).map(lambda x: (x[1][0]['Zip'] if x[1][0] != None else None, x[1][1]['zipcd'] if x[1][1] != None else None))
df = spark.createDataFrame(zipped_rdd, schema=['Zip', 'zipcd'])

Result:

+-----------+-----+
|        Zip|zipcd|
+-----------+-----+
|6.0651002E8|60651|
|6.0623002E8|60623|
|6.0077203E8|60077|
|6.0626528E8|60626|
|6.0077338E8|60077|
|        0.0|    0|
+-----------+-----+

Upvotes: 1

Related Questions