MMV
MMV

Reputation: 208

How to insert values into a table from a list

I have a list and a table like below and what I need to do is to go through values in the item name column in the table and find the item name that is available in a list but missing from the table if there is any. then I need to insert that missing item_name into the table with null value for the item value column and the same timestamp as others for the timestamp column.

list_of_tags = ["item_1", "item_2", "item_3", "item_4", "item_5", "item_1_a", "item_1_b", "item_1_c", "item_1_d", "item_1_e" ]

|item_name | item_value | timestamp |
|:-------  |:------:| ----------------------------:|
| item_1   | 23.2   | 2023-05-08T20:00:00.000+0000 |
| item_2   | 45.2   | 2023-05-08T20:00:00.000+0000 |
| item_3   | 34.3   | 2023-05-08T20:00:00.000+0000 |
| item_4   | 56.3   | 2023-05-08T20:00:00.000+0000 |
| item_1_a | 23.2   | 2023-05-08T20:00:00.000+0000 |
| item_2_b | 45.2   | 2023-05-08T20:00:00.000+0000 |
| item_3_c | 34.3   | 2023-05-08T20:00:00.000+0000 |
| item_4_d | 56.3   | 2023-05-08T20:00:00.000+0000 |

the outcome I want is

|item_name | item_value  | timestamp                    |
|:------   |:------------:| ----------------------------:|
| item_1   | 23.2         | 2023-05-08T20:00:00.000+0000 |
| item_2   | 45.2         | 2023-05-08T20:00:00.000+0000 |
| item_3   | 34.3         | 2023-05-08T20:00:00.000+0000 |
| item_4   | 56.3         | 2023-05-08T20:00:00.000+0000 |
| item_4   | 56.3         | 2023-05-08T20:00:00.000+0000 |
| item_5   | null         | 2023-05-08T20:00:00.000+0000 |
| item_1_a | 23.2         | 2023-05-08T20:00:00.000+0000 |
| item_2_b | 45.2         | 2023-05-08T20:00:00.000+0000 |
| item_3_c | 34.3         | 2023-05-08T20:00:00.000+0000 |
| item_4_d | 56.3         | 2023-05-08T20:00:00.000+0000 |
| item_5_e | null         | 2023-05-08T20:00:00.000+0000 |

How can I do this using Pyspark?

Any help is greatly appreciated

Upvotes: 0

Views: 214

Answers (1)

In the missing item names to get the original DataFrame using the unionByName function. Here is How you can do it

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_timestamp, col
from pyspark.sql.types import StringType, DoubleType, TimestampType, StructType, StructField

From the libraries above you have to import the required datatypes and functions.

Create DataFrame from the given table

spark = SparkSession.builder.getOrCreate() 

data = [
    ("item_1", 23.2, "2023-05-08T20:00:00.000+0000"),
    ("item_2", 45.2, "2023-05-08T20:00:00.000+0000"),
    ("item_3", 34.3, "2023-05-08T20:00:00.000+0000"),
    ("item_4", 56.3, "2023-05-08T20:00:00.000+0000"),
    ("item_5", None, "2023-05-08T20:00:00.000+0000"),  
    ("item_1_a", 23.2, "2023-05-08T20:00:00.000+0000"),
    ("item_2_b", 45.2, "2023-05-08T20:00:00.000+0000"),
    ("item_3_c", 34.3, "2023-05-08T20:00:00.000+0000"),
    ("item_4_d", 56.3, "2023-05-08T20:00:00.000+0000"),
    ("item_5_e", None, "2023-05-08T20:00:00.000+0000")  
]

df = spark.createDataFrame(data, ["item_name", "item_value", "timestamp"])

Now Convert timestamp column to TimestampType()

df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))

Create DataFrame from the list of tags

list_of_tags = [
    "item_1", "item_2", "item_3", "item_4", "item_5",
    "item_1_a", "item_2_b", "item_3_c", "item_4_d", "item_5_e"
]

schema = StructType([
    StructField("item_name", StringType(), nullable=False),
    StructField("item_value", DoubleType(), nullable=True),
    StructField("timestamp", TimestampType(), nullable=False)
])


existing_items = df.select("item_name").distinct().collect()
existing_timestamp = df.select("timestamp").first()[0]

missing_items = [item for item in list_of_tags if item not in [row.item_name for row in existing_items]]

missing_items_df = spark.createDataFrame([(item, None, existing_timestamp) for item in missing_items], schema)

Add missing item names to the original DataFrame and replace null values

updated_df = df.unionByName(missing_items_df)
updated_df = updated_df.fillna({"item_value": "null"})

Return the updated DataFrame

updated_df.display(truncate=False)

enter image description here

enter image description here

enter image description here

enter image description here

Upvotes: 0

Related Questions