Reputation: 208
I have a list and a table like below and what I need to do is to go through values in the item name column in the table and find the item name that is available in a list but missing from the table if there is any. then I need to insert that missing item_name into the table with null value for the item value column and the same timestamp as others for the timestamp column.
list_of_tags = ["item_1", "item_2", "item_3", "item_4", "item_5", "item_1_a", "item_1_b", "item_1_c", "item_1_d", "item_1_e" ]
|item_name | item_value | timestamp |
|:------- |:------:| ----------------------------:|
| item_1 | 23.2 | 2023-05-08T20:00:00.000+0000 |
| item_2 | 45.2 | 2023-05-08T20:00:00.000+0000 |
| item_3 | 34.3 | 2023-05-08T20:00:00.000+0000 |
| item_4 | 56.3 | 2023-05-08T20:00:00.000+0000 |
| item_1_a | 23.2 | 2023-05-08T20:00:00.000+0000 |
| item_2_b | 45.2 | 2023-05-08T20:00:00.000+0000 |
| item_3_c | 34.3 | 2023-05-08T20:00:00.000+0000 |
| item_4_d | 56.3 | 2023-05-08T20:00:00.000+0000 |
the outcome I want is
|item_name | item_value | timestamp |
|:------ |:------------:| ----------------------------:|
| item_1 | 23.2 | 2023-05-08T20:00:00.000+0000 |
| item_2 | 45.2 | 2023-05-08T20:00:00.000+0000 |
| item_3 | 34.3 | 2023-05-08T20:00:00.000+0000 |
| item_4 | 56.3 | 2023-05-08T20:00:00.000+0000 |
| item_4 | 56.3 | 2023-05-08T20:00:00.000+0000 |
| item_5 | null | 2023-05-08T20:00:00.000+0000 |
| item_1_a | 23.2 | 2023-05-08T20:00:00.000+0000 |
| item_2_b | 45.2 | 2023-05-08T20:00:00.000+0000 |
| item_3_c | 34.3 | 2023-05-08T20:00:00.000+0000 |
| item_4_d | 56.3 | 2023-05-08T20:00:00.000+0000 |
| item_5_e | null | 2023-05-08T20:00:00.000+0000 |
How can I do this using Pyspark?
Any help is greatly appreciated
Upvotes: 0
Views: 214
Reputation: 3250
In the missing item names to get the original DataFrame using the unionByName function. Here is How you can do it
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_timestamp, col
from pyspark.sql.types import StringType, DoubleType, TimestampType, StructType, StructField
From the libraries above you have to import the required datatypes and functions.
Create DataFrame from the given table
spark = SparkSession.builder.getOrCreate()
data = [
("item_1", 23.2, "2023-05-08T20:00:00.000+0000"),
("item_2", 45.2, "2023-05-08T20:00:00.000+0000"),
("item_3", 34.3, "2023-05-08T20:00:00.000+0000"),
("item_4", 56.3, "2023-05-08T20:00:00.000+0000"),
("item_5", None, "2023-05-08T20:00:00.000+0000"),
("item_1_a", 23.2, "2023-05-08T20:00:00.000+0000"),
("item_2_b", 45.2, "2023-05-08T20:00:00.000+0000"),
("item_3_c", 34.3, "2023-05-08T20:00:00.000+0000"),
("item_4_d", 56.3, "2023-05-08T20:00:00.000+0000"),
("item_5_e", None, "2023-05-08T20:00:00.000+0000")
]
df = spark.createDataFrame(data, ["item_name", "item_value", "timestamp"])
Now Convert timestamp column to TimestampType()
df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
Create DataFrame from the list of tags
list_of_tags = [
"item_1", "item_2", "item_3", "item_4", "item_5",
"item_1_a", "item_2_b", "item_3_c", "item_4_d", "item_5_e"
]
schema = StructType([
StructField("item_name", StringType(), nullable=False),
StructField("item_value", DoubleType(), nullable=True),
StructField("timestamp", TimestampType(), nullable=False)
])
existing_items = df.select("item_name").distinct().collect()
existing_timestamp = df.select("timestamp").first()[0]
missing_items = [item for item in list_of_tags if item not in [row.item_name for row in existing_items]]
missing_items_df = spark.createDataFrame([(item, None, existing_timestamp) for item in missing_items], schema)
Add missing item names to the original DataFrame and replace null values
updated_df = df.unionByName(missing_items_df)
updated_df = updated_df.fillna({"item_value": "null"})
Return the updated DataFrame
updated_df.display(truncate=False)
Upvotes: 0