Pfinnn
Pfinnn

Reputation: 572

Interpolation between two Dataframes (years with values) in Pyspark

How can I implement linear interpolation between two PySpark DataFrames representing data for different years, say 2020 and 2030, to generate a new PySpark DataFrame for an intermediary year like 2025? Both DataFrames have identical structures with numeric values. The years have the same granularity.

My initial approach involved https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.interpolate.html

Is this the recommended way?

I have written this Pandas method awhile back, but I need to mgirate to Pyspark, but I struggle to implement the same in Pandas.

def interpolate_between_years(first: DataFrame, second: DataFrame) -> DataFrame:

    years = [first.index.year[0], second.index.year[0]]
    interpolated_df = (
        pd.concat(
            [first.reset_index(drop=True), second.reset_index(drop=True)],
            keys=years,
            axis=1,
        )
        .T.reindex(np.arange(years[0], years[1] + 1))
        .interpolate()
    )

    return interpolated_df

Upvotes: 1

Views: 137

Answers (1)

Vikas Sharma
Vikas Sharma

Reputation: 2149

You may use Pandas API on Spark PySpark as follows to do linear interpolation:

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

def interpolate_between_years(first: DataFrame, second: DataFrame) -> DataFrame:
    year_first = first.select("year").first()[0]
    year_second = second.select("year").first()[0]

    years_range = range(year_first, year_second + 1)
    interpolated_df = spark.createDataFrame([(year,) for year in years_range], ["year"])

    for col_name in first.columns:
        if col_name != "year":
            first_val = first.select(col_name).first()[0]
            second_val = second.select(col_name).first()[0]
            diff = second_val - first_val
            factor = (col("year") - year_first) / (year_second - year_first)
            interpolated_df = interpolated_df.withColumn(col_name, first_val + diff * factor)

    return interpolated_df

spark = SparkSession.builder.getOrCreate()

data_2020 = [(2020, 10, 20), (2021, 11, 21), (2022, 12, 22)]
df_2020 = spark.createDataFrame(data_2020, ["year", "value1", "value2"])

data_2030 = [(2030, 40, 50), (2031, 41, 51), (2032, 42, 52)]
df_2030 = spark.createDataFrame(data_2030, ["year", "value1", "value2"])

interpolated_df = interpolate_between_years(df_2020, df_2030)
interpolated_df.show()

Output:

+----+------+------+
|year|value1|value2|
+----+------+------+
|2020|  10.0|  20.0|
|2021|  13.0|  23.0|
|2022|  16.0|  26.0|
|2023|  19.0|  29.0|
|2024|  22.0|  32.0|
|2025|  25.0|  35.0|
|2026|  28.0|  38.0|
|2027|  31.0|  41.0|
|2028|  34.0|  44.0|
|2029|  37.0|  47.0|
|2030|  40.0|  50.0|
+----+------+------+

From this interpolated_df, you may fetch data for any intermediary year like 2025, 2026, and so on.

Upvotes: 0

Related Questions