Reputation: 572
How can I implement linear interpolation between two PySpark DataFrames representing data for different years, say 2020 and 2030, to generate a new PySpark DataFrame for an intermediary year like 2025? Both DataFrames have identical structures with numeric values. The years have the same granularity.
My initial approach involved https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.interpolate.html
Is this the recommended way?
I have written this Pandas method awhile back, but I need to mgirate to Pyspark, but I struggle to implement the same in Pandas.
def interpolate_between_years(first: DataFrame, second: DataFrame) -> DataFrame:
years = [first.index.year[0], second.index.year[0]]
interpolated_df = (
pd.concat(
[first.reset_index(drop=True), second.reset_index(drop=True)],
keys=years,
axis=1,
)
.T.reindex(np.arange(years[0], years[1] + 1))
.interpolate()
)
return interpolated_df
Upvotes: 1
Views: 137
Reputation: 2149
You may use Pandas API on Spark PySpark as follows to do linear interpolation:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
def interpolate_between_years(first: DataFrame, second: DataFrame) -> DataFrame:
year_first = first.select("year").first()[0]
year_second = second.select("year").first()[0]
years_range = range(year_first, year_second + 1)
interpolated_df = spark.createDataFrame([(year,) for year in years_range], ["year"])
for col_name in first.columns:
if col_name != "year":
first_val = first.select(col_name).first()[0]
second_val = second.select(col_name).first()[0]
diff = second_val - first_val
factor = (col("year") - year_first) / (year_second - year_first)
interpolated_df = interpolated_df.withColumn(col_name, first_val + diff * factor)
return interpolated_df
spark = SparkSession.builder.getOrCreate()
data_2020 = [(2020, 10, 20), (2021, 11, 21), (2022, 12, 22)]
df_2020 = spark.createDataFrame(data_2020, ["year", "value1", "value2"])
data_2030 = [(2030, 40, 50), (2031, 41, 51), (2032, 42, 52)]
df_2030 = spark.createDataFrame(data_2030, ["year", "value1", "value2"])
interpolated_df = interpolate_between_years(df_2020, df_2030)
interpolated_df.show()
Output:
+----+------+------+
|year|value1|value2|
+----+------+------+
|2020| 10.0| 20.0|
|2021| 13.0| 23.0|
|2022| 16.0| 26.0|
|2023| 19.0| 29.0|
|2024| 22.0| 32.0|
|2025| 25.0| 35.0|
|2026| 28.0| 38.0|
|2027| 31.0| 41.0|
|2028| 34.0| 44.0|
|2029| 37.0| 47.0|
|2030| 40.0| 50.0|
+----+------+------+
From this interpolated_df
, you may fetch data for any intermediary year like 2025, 2026, and so on.
Upvotes: 0