Sean
Sean

Reputation: 4515

Generate UUID column with a UDF and then split into two dataframes with common UUID column

I have a dataframe containing info about people with columns first_name, last_name, and email_addresses, where email_addresses is a comma separated list of email addresses. The data looks something like this:

first_name last_name email_addresses
Jane Doe [email protected],[email protected]
John Smith [email protected],[email protected]

I'm trying to split this into two dataframes by first adding a person_id column populated with UUIDs using a UDF, and then creating a new dataframe by doing a split and explode on the email_addresses column.

My code looks something like this:

from uuid import uuid4
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

uuid_udf = F.udf(lambda: str(uuid4()), StringType()).asNondeterministic()

df = ingest_data_from_csv() # Imagine this function populates a dataframe from a CSV

df_with_id = df.withColumn("person_id", uuid_udf())

person_df = df_with_id.drop("email_addresses")

email_df = (
    df_with_id.drop("first_name", "last_name")
    .withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
    .drop("email_addresses")

I thought this would give me two data frames with a foreign key type of relationship that look something like this:

person_id first_name last_name
00000000-0000-0000-0000-000000000001 Jane Doe
00000000-0000-0000-0000-000000000002 John Smith
person_id email_address
00000000-0000-0000-0000-000000000001 [email protected]
00000000-0000-0000-0000-000000000001 [email protected]
00000000-0000-0000-0000-000000000002 [email protected]
00000000-0000-0000-0000-000000000002 [email protected]

but what I end up with is different values in all the person_id columns, looking something like this:

person_id first_name last_name
00000000-0000-0000-0000-000000000001 Jane Doe
00000000-0000-0000-0000-000000000002 John Smith
person_id email_address
00000000-0000-0000-0000-000000000003 [email protected]
00000000-0000-0000-0000-000000000004 [email protected]
00000000-0000-0000-0000-000000000005 [email protected]
00000000-0000-0000-0000-000000000006 [email protected]

Is there a way to implement this "foreign key" type of relationship with pyspark?

Upvotes: 0

Views: 2114

Answers (2)

mck
mck

Reputation: 42332

cache the dataframe so that the results will not change when you call the Python functions again. Also you can call the uuid spark SQL function with F.expr('uuid()'):

df_with_id = df.withColumn("id", F.expr('uuid()'))
# or if you want to use your udf,
# df_with_id = df.withColumn("id", uuid_udf())

df_with_id.cache()   ### IMPORTANT!

person_df = df_with_id.drop("email_addresses")
email_df = (
    df_with_id.drop("first_name", "last_name")
    .withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
    .drop("email_addresses")
)

email_df.show()
+--------------------+--------------------+
|                  id|       email_address|
+--------------------+--------------------+
|389fb85d-338f-469...|[email protected]|
|389fb85d-338f-469...|[email protected]|
|6600abe9-8087-420...|john.smith@exampl...|
|6600abe9-8087-420...|smith.john@exampl...|
+--------------------+--------------------+

person_df.show()
+----------+---------+--------------------+
|first_name|last_name|                  id|
+----------+---------+--------------------+
|      Jane|      Doe|389fb85d-338f-469...|
|      John|    Smith|6600abe9-8087-420...|
+----------+---------+--------------------+

Upvotes: 1

Vijay_Shinde
Vijay_Shinde

Reputation: 1352

You can do this by using the sha2 function of Spark by creating a hash value of first_name and last_name. Please note that, I have also added monotonically_increasing_id to create a hash value. This, I am using because if first_name and last_name are coming similar, then we should not have a similar hash value.

Code:

from uuid import uuid4
from pyspark.sql import functions as F
df = spark.read.option("header","true").csv("D:/DataAnalysis/person.csv")
    
df.show(truncate=False)
+----------+---------+---------------------------------------------+
|first_name|last_name|email_addresses                              |
+----------+---------+---------------------------------------------+
|Jane      |Doe      |[email protected],[email protected]    |
|John      |Smith    |[email protected],[email protected]|
+----------+---------+---------------------------------------------+
from pyspark.sql.functions import col, sha2, concat 

df_with_id = df.withColumn("uid", sha2(concat(F.monotonically_increasing_id(),col("first_name"), col("last_name")), 256))

df_with_id.show(truncate=False)
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
|first_name|last_name|email_addresses                              |uid                                                             |
+----------+---------+---------------------------------------------+----------------------------------------------------------------+
|Jane      |Doe      |[email protected],[email protected]    |ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|
|John      |Smith    |[email protected],[email protected]|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|
+----------+---------+---------------------------------------------+----------------------------------------------------------------+

person_df = df_with_id.drop("email_addresses")

person_df.show(truncate=False)
+----------+---------+----------------------------------------------------------------+
|first_name|last_name|uid                                                             |
+----------+---------+----------------------------------------------------------------+
|Jane      |Doe      |ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|
|John      |Smith    |001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|
+----------+---------+----------------------------------------------------------------+

email_df = (
    df_with_id.drop("first_name", "last_name")
    .withColumn("email_address", F.explode(F.split(F.col("email_addresses"), ",")))
    .drop("email_addresses")
    )   

email_df.show(truncate=False)
+----------------------------------------------------------------+----------------------+
|uid                                                             |email_address         |
+----------------------------------------------------------------+----------------------+
|ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|[email protected]  |
|ed9d106bca3c383d07e00bbc147cb2a6625e198a0eadd275017b1c152b617a20|[email protected]  |
|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|[email protected]|
|001fb8d89c60d70c9e24d6122efc0b2c85450c8009ab1936f231055c54ccf7e6|[email protected]|
+----------------------------------------------------------------+----------------------+   

Upvotes: 2

Related Questions