Cassius Clay
Cassius Clay

Reputation: 332

how to capture change in records in pyspark?

I have two dataframes df1 and df2.

Below is dataframe df1.

EmpID Work StartTime StopTime
1 Call 2023-06-02T10:00:00.000+0000 2023-06-02T12:10:00.000+0000
1 Meal 2023-06-02T12:10:00.000+0000 2023-06-02T13:00:00.000+0000
1 Call 2023-06-02T13:00:00.000+0000 2023-06-02T15:00:00.000+0000
1 Break 2023-06-02T15:00:00.000+0000 2023-06-02T15:15:00.000+0000
1 Call 2023-06-02T15:15:00.000+0000 2023-06-02T17:00:00.000+0000
1 Break 2023-06-02T17:00:00.000+0000 2023-06-02T17:15:00.000+0000
1 Call 2023-06-02T17:15:00.000+0000 2023-06-02T19:00:00.000+0000

Below is dataframe df2.

EmpID Work StartTime StopTime
1 Planned OOF 2023-06-02T10:00:00.000+0000 2023-06-02T12:00:00.000+0000
1 Meal 2023-06-02T12:00:00.000+0000 2023-06-02T13:00:00.000+0000
1 Planned OOF 2023-06-02T13:00:00.000+0000 2023-06-02T19:00:00.000+0000

Expected Output:

1 ) I have to find the records in df2 where the type of 'Work' that is done in df2 for an EmpID on a date is not present in df1 for the Same EmpID and same Date.

Example: Planned OOF is present in df2 for Empid 1 on June 2nd but not in df1 for Empid 1 on June 2nd. I have to capture the two records of Planned OOF from df2 and add '00' as Type column to them.

2 ) I have to find common Work in df2 and df1 for Same EmpID on Same Day.

Example: 'Meal' Work is present for Empid 1 on June 2nd in df2 and the Same work 'Meal' is present for Empid 1 on June 2nd in df1. I have to capture record from df1 an assgin 10 as Type and capture record from df2 and assign 11 as Type

EmpID Work StartTime StopTime Type
1 Planned OOF 2023-06-02T10:00:00.000+0000 2023-06-02T12:00:00.000+0000 00
1 Planned OOF 2023-06-02T13:00:00.000+0000 2023-06-02T19:00:00.000+0000 00
1 Meal 2023-06-02T12:10:00.000+0000 2023-06-02T13:00:00.000+0000 10
1 Meal 2023-06-02T12:00:00.000+0000 2023-06-02T13:00:00.000+0000 11

Extra: Providing the dataframe query for df1 and df2

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder.getOrCreate()

schema = StructType([
    StructField("EmpID", StringType(), True),
    StructField("Work", StringType(), True),
    StructField("StartTime", StringType(), True),
    StructField("StopTime", StringType(), True)
])

data = [
    ("1", "Call", "2023-06-02T10:00:00.000+0000", "2023-06-02T12:10:00.000+0000"),
    ("1", "Meal", "2023-06-02T12:10:00.000+0000", "2023-06-02T13:00:00.000+0000"),
    ("1", "Call", "2023-06-02T13:00:00.000+0000", "2023-06-02T15:00:00.000+0000"),
    ("1", "Break", "2023-06-02T15:00:00.000+0000", "2023-06-02T15:15:00.000+0000"),
    ("1", "Call", "2023-06-02T15:15:00.000+0000", "2023-06-02T17:00:00.000+0000"),
    ("1", "Break", "2023-06-02T17:00:00.000+0000", "2023-06-02T17:15:00.000+0000"),
    ("1", "Call", "2023-06-02T17:15:00.000+0000", "2023-06-02T19:00:00.000+0000")
]

df1 = spark.createDataFrame(data, schema)

df1 = df1.withColumn("StartTime", to_timestamp("StartTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
df1 = df1.withColumn("StopTime", to_timestamp("StopTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))

Below is for df2:

data = [
    ("1", "Planned OOF", "2023-06-02T10:00:00.000+0000", "2023-06-02T12:00:00.000+0000"),
    ("1", "Meal", "2023-06-02T12:00:00.000+0000", "2023-06-02T13:00:00.000+0000"),
    ("1", "Planned OOF", "2023-06-02T13:00:00.000+0000", "2023-06-02T19:00:00.000+0000")
]

df2 = spark.createDataFrame(data, schema)

# Convert StartTimeStamp and StopTimeStamp columns to timestamp data type
df2 = df2.withColumn("StartTime", to_timestamp("StartTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
df2 = df2.withColumn("StopTime", to_timestamp("StopTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))

Upvotes: 0

Views: 250

Answers (1)

Aswin
Aswin

Reputation: 7156

Step:1

You can use the left_anti join to find the records in df2 where the type of 'Work' that is done in df2 for an EmpID on a date is not present in df1 for the Same EmpID and same Date. Then add '00' as Type column to that dataframe. Below is the code,

Code:

from pyspark.sql.functions import date_format, col, lit

# Find the records in df2 where the type of 'Work' that is done in df2 for an EmpID on a date is not present in df1 for the Same EmpID and same Date
df2_not_in_df1 = df2.join(df1, (df1.EmpID == df2.EmpID) & (date_format(df1.StartTime, 'yyyy-MM-dd') == date_format(df2.StartTime, 'yyyy-MM-dd')) & (df1.Work == df2.Work), 'left_anti')

# Add '00' as Type column to the filtered records
df2_not_in_df1 = df2_not_in_df1.withColumn('Type', lit('00'))

# Select the required columns
df2_not_in_df1 = df2_not_in_df1.select('EmpID', 'Work', 'StartTime', 'StopTime','Type')

# Show the output
df2_not_in_df1.show()
EmpID Work StartTime StopTime Type
1 Planned OOF 2023-06-02T10:00:00.000+0000 2023-06-02T12:00:00.000+0000 00
1 Planned OOF 2023-06-02T13:00:00.000+0000 2023-06-02T19:00:00.000+0000 00

Step:2

You can use the inner join to find common Work in df2 and df1 for Same EmpID on Same Day. Then capture record from df1 an assign 10 as Type and capture record from df2 and assign 11 as Type.

Code

# Find common Work in df2 and df1 for Same EmpID on Same Day
# Rename the columns of df2
df2_renamed = df2.select([col(c).alias(c + '_df2') for c in df2.columns])

df_common = df2_renamed.join(df1, (df1.EmpID == df2_renamed.EmpID_df2) & (date_format(df1.StartTime, 'yyyy-MM-dd') == date_format(df2_renamed.StartTime_df2, 'yyyy-MM-dd')) & (df1.Work == df2_renamed.Work_df2), 'inner').select(df2_renamed.columns)

# Add Type column to df1 records
df1_filtered = df1.join(df_common, (df1.EmpID == df_common.EmpID_df2) & (date_format(df1.StartTime, 'yyyy-MM-dd') == date_format(df_common.StartTime_df2, 'yyyy-MM-dd')) & (df1.Work == df_common.Work_df2), 'inner')
df1_filtered = df1_filtered.withColumn('Type', lit('10'))

# Add Type column to df2 records
df2_filtered = df2.join(df_common, (df2.EmpID == df_common.EmpID_df2) & (date_format(df2.StartTime, 'yyyy-MM-dd') == date_format(df_common.StartTime_df2, 'yyyy-MM-dd')) & (df2.Work == df_common.Work_df2), 'inner')
df2_filtered = df2_filtered.withColumn('Type', lit('11'))

# Combine the filtered dataframes
df_combined_filtered = df1_filtered.union(df2_filtered)

# Select the required columns
df_combined_filtered = df_combined_filtered.select('EmpID', 'Work', 'StartTime', 'StopTime', 'Type')

df_combined_filtered.show()
EmpID Work StartTime StopTime Type
1 Meal 2023-06-02T12:10:00.000+0000 2023-06-02T13:00:00.000+0000 10
1 Meal 2023-06-02T12:00:00.000+0000 2023-06-02T13:00:00.000+0000 11

Step:3

Union the outputs from step2 and step3.

#Combine the df2_not_in_df1 dataframe with df_combined_filtered
df_combined = df_combined_filtered.union(df2_not_in_df1)

# Show the output
df_combined.show()

enter image description here

Upvotes: 2

Related Questions