Reputation: 332
I have two dataframes df1 and df2.
Below is dataframe df1.
EmpID | Work | StartTime | StopTime |
---|---|---|---|
1 | Call | 2023-06-02T10:00:00.000+0000 | 2023-06-02T12:10:00.000+0000 |
1 | Meal | 2023-06-02T12:10:00.000+0000 | 2023-06-02T13:00:00.000+0000 |
1 | Call | 2023-06-02T13:00:00.000+0000 | 2023-06-02T15:00:00.000+0000 |
1 | Break | 2023-06-02T15:00:00.000+0000 | 2023-06-02T15:15:00.000+0000 |
1 | Call | 2023-06-02T15:15:00.000+0000 | 2023-06-02T17:00:00.000+0000 |
1 | Break | 2023-06-02T17:00:00.000+0000 | 2023-06-02T17:15:00.000+0000 |
1 | Call | 2023-06-02T17:15:00.000+0000 | 2023-06-02T19:00:00.000+0000 |
Below is dataframe df2.
EmpID | Work | StartTime | StopTime |
---|---|---|---|
1 | Planned OOF | 2023-06-02T10:00:00.000+0000 | 2023-06-02T12:00:00.000+0000 |
1 | Meal | 2023-06-02T12:00:00.000+0000 | 2023-06-02T13:00:00.000+0000 |
1 | Planned OOF | 2023-06-02T13:00:00.000+0000 | 2023-06-02T19:00:00.000+0000 |
Expected Output:
1 ) I have to find the records in df2 where the type of 'Work' that is done in df2 for an EmpID on a date is not present in df1 for the Same EmpID and same Date.
Example: Planned OOF is present in df2 for Empid 1 on June 2nd but not in df1 for Empid 1 on June 2nd. I have to capture the two records of Planned OOF from df2 and add '00' as Type column to them.
2 ) I have to find common Work in df2 and df1 for Same EmpID on Same Day.
Example: 'Meal' Work is present for Empid 1 on June 2nd in df2 and the Same work 'Meal' is present for Empid 1 on June 2nd in df1. I have to capture record from df1 an assgin 10 as Type and capture record from df2 and assign 11 as Type
EmpID | Work | StartTime | StopTime | Type |
---|---|---|---|---|
1 | Planned OOF | 2023-06-02T10:00:00.000+0000 | 2023-06-02T12:00:00.000+0000 | 00 |
1 | Planned OOF | 2023-06-02T13:00:00.000+0000 | 2023-06-02T19:00:00.000+0000 | 00 |
1 | Meal | 2023-06-02T12:10:00.000+0000 | 2023-06-02T13:00:00.000+0000 | 10 |
1 | Meal | 2023-06-02T12:00:00.000+0000 | 2023-06-02T13:00:00.000+0000 | 11 |
Extra: Providing the dataframe query for df1 and df2
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.getOrCreate()
schema = StructType([
StructField("EmpID", StringType(), True),
StructField("Work", StringType(), True),
StructField("StartTime", StringType(), True),
StructField("StopTime", StringType(), True)
])
data = [
("1", "Call", "2023-06-02T10:00:00.000+0000", "2023-06-02T12:10:00.000+0000"),
("1", "Meal", "2023-06-02T12:10:00.000+0000", "2023-06-02T13:00:00.000+0000"),
("1", "Call", "2023-06-02T13:00:00.000+0000", "2023-06-02T15:00:00.000+0000"),
("1", "Break", "2023-06-02T15:00:00.000+0000", "2023-06-02T15:15:00.000+0000"),
("1", "Call", "2023-06-02T15:15:00.000+0000", "2023-06-02T17:00:00.000+0000"),
("1", "Break", "2023-06-02T17:00:00.000+0000", "2023-06-02T17:15:00.000+0000"),
("1", "Call", "2023-06-02T17:15:00.000+0000", "2023-06-02T19:00:00.000+0000")
]
df1 = spark.createDataFrame(data, schema)
df1 = df1.withColumn("StartTime", to_timestamp("StartTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
df1 = df1.withColumn("StopTime", to_timestamp("StopTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
Below is for df2:
data = [
("1", "Planned OOF", "2023-06-02T10:00:00.000+0000", "2023-06-02T12:00:00.000+0000"),
("1", "Meal", "2023-06-02T12:00:00.000+0000", "2023-06-02T13:00:00.000+0000"),
("1", "Planned OOF", "2023-06-02T13:00:00.000+0000", "2023-06-02T19:00:00.000+0000")
]
df2 = spark.createDataFrame(data, schema)
# Convert StartTimeStamp and StopTimeStamp columns to timestamp data type
df2 = df2.withColumn("StartTime", to_timestamp("StartTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
df2 = df2.withColumn("StopTime", to_timestamp("StopTime", "yyyy-MM-dd'T'HH:mm:ss.SSSX"))
Upvotes: 0
Views: 250
Reputation: 7156
Step:1
You can use the left_anti join to find the records in df2 where the type of 'Work' that is done in df2 for an EmpID
on a date is not present in df1 for the Same EmpID
and same Date. Then add '00' as Type
column to that dataframe. Below is the code,
Code:
from pyspark.sql.functions import date_format, col, lit
# Find the records in df2 where the type of 'Work' that is done in df2 for an EmpID on a date is not present in df1 for the Same EmpID and same Date
df2_not_in_df1 = df2.join(df1, (df1.EmpID == df2.EmpID) & (date_format(df1.StartTime, 'yyyy-MM-dd') == date_format(df2.StartTime, 'yyyy-MM-dd')) & (df1.Work == df2.Work), 'left_anti')
# Add '00' as Type column to the filtered records
df2_not_in_df1 = df2_not_in_df1.withColumn('Type', lit('00'))
# Select the required columns
df2_not_in_df1 = df2_not_in_df1.select('EmpID', 'Work', 'StartTime', 'StopTime','Type')
# Show the output
df2_not_in_df1.show()
EmpID | Work | StartTime | StopTime | Type |
---|---|---|---|---|
1 | Planned OOF | 2023-06-02T10:00:00.000+0000 | 2023-06-02T12:00:00.000+0000 | 00 |
1 | Planned OOF | 2023-06-02T13:00:00.000+0000 | 2023-06-02T19:00:00.000+0000 | 00 |
Step:2
You can use the inner join to find common Work in df2 and df1 for Same EmpID
on Same Day. Then capture record from df1 an assign 10 as Type
and capture record from df2 and assign 11 as Type
.
Code
# Find common Work in df2 and df1 for Same EmpID on Same Day
# Rename the columns of df2
df2_renamed = df2.select([col(c).alias(c + '_df2') for c in df2.columns])
df_common = df2_renamed.join(df1, (df1.EmpID == df2_renamed.EmpID_df2) & (date_format(df1.StartTime, 'yyyy-MM-dd') == date_format(df2_renamed.StartTime_df2, 'yyyy-MM-dd')) & (df1.Work == df2_renamed.Work_df2), 'inner').select(df2_renamed.columns)
# Add Type column to df1 records
df1_filtered = df1.join(df_common, (df1.EmpID == df_common.EmpID_df2) & (date_format(df1.StartTime, 'yyyy-MM-dd') == date_format(df_common.StartTime_df2, 'yyyy-MM-dd')) & (df1.Work == df_common.Work_df2), 'inner')
df1_filtered = df1_filtered.withColumn('Type', lit('10'))
# Add Type column to df2 records
df2_filtered = df2.join(df_common, (df2.EmpID == df_common.EmpID_df2) & (date_format(df2.StartTime, 'yyyy-MM-dd') == date_format(df_common.StartTime_df2, 'yyyy-MM-dd')) & (df2.Work == df_common.Work_df2), 'inner')
df2_filtered = df2_filtered.withColumn('Type', lit('11'))
# Combine the filtered dataframes
df_combined_filtered = df1_filtered.union(df2_filtered)
# Select the required columns
df_combined_filtered = df_combined_filtered.select('EmpID', 'Work', 'StartTime', 'StopTime', 'Type')
df_combined_filtered.show()
EmpID | Work | StartTime | StopTime | Type |
---|---|---|---|---|
1 | Meal | 2023-06-02T12:10:00.000+0000 | 2023-06-02T13:00:00.000+0000 | 10 |
1 | Meal | 2023-06-02T12:00:00.000+0000 | 2023-06-02T13:00:00.000+0000 | 11 |
Step:3
Union the outputs from step2 and step3.
#Combine the df2_not_in_df1 dataframe with df_combined_filtered
df_combined = df_combined_filtered.union(df2_not_in_df1)
# Show the output
df_combined.show()
Upvotes: 2