Reputation: 1
I have an Excel sheet of the formula I need to convert into Pyspark code
considering columns A, B, C, D, E, F, G, H and I where columns F, G, H and I have fixed random numeric values.
Column A has the first row as NULL and subsequent rows follow the formula as "=F3+G3+((C2+D2+E2)/2)".
Column B has the first row as 1000 and subsequent rows follow the formula as "=A3+(B2/2)".
column C follows the formula as "=$B2*5+(100/2)".
column D follows the formula as "=$B2*5+(10/2)".
Column E follows the formula as "=$B2*5+(1/2)".
could you write me a Pyspark code for the same?
Upvotes: 0
Views: 295
Reputation: 2468
This is pyspark solution using pandas_udf. Not sure about the performance.
import sys
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.types import *
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
[0.0, 1000.0, 0.0, 0.0, 0.0, 15, 62, "ITEM147", 1], ## 2nd row in Excel
[0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", 2], ### 3rd row in Excel and so on
[0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", 3],
[0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", 4],
[0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", 5],
[0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", 6],
[0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", 7],
[0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", 8],
[0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", 9],
]
df1Columns = ["A", "B", "C", "D", "E", "F", "G", "Product", "Steps"]
dataframe = sqlContext.createDataFrame(data=data1, schema=df1Columns)
print("Given initial pyspark pandas dataframe")
print(dataframe)
dataframe.printSchema()
def set_next_C(curr_b):
return curr_b * 5 + (100.0/2)
def set_next_D(curr_b):
return curr_b * 5 + (10.0/2)
def set_next_E(curr_b):
return curr_b * 5 + (1.0/2)
def set_next_A(curr_f, curr_g, old_c, old_d, old_e):
return curr_f + curr_g + ((old_c + old_d + old_e) / 2.0)
def set_next_B(curr_a, old_b):
return curr_a + (old_b / 2.0)
@F.pandas_udf("string")
def multiply_aggregation( all_cols_series: pd.Series ) -> str :
### Initialization
a_old = all_cols_series[0][0]
b_old = all_cols_series[0][1]
c_old = None
d_old = None
e_old = None
f_old = all_cols_series[0][5]
g_old = all_cols_series[0][6]
## Current will be updated as we go along. At first, they will be initial values
a_current = None
b_current = None
c_current = None
d_current = None
e_current = None
length_series = all_cols_series.size
for cc in range(length_series):
if cc == 0:
## First row is being initialized
c_old = set_next_C(b_old)
d_old = set_next_D(b_old)
e_old = set_next_E(b_old)
continue
### calculate all current values
f_current = all_cols_series[cc][5]
g_current = all_cols_series[cc][6]
a_current = set_next_A(f_current, g_current, c_old, d_old, e_old)
b_current = set_next_B(a_current, b_old)
c_current = set_next_C(b_current)
d_current = set_next_D(b_current)
e_current = set_next_E(b_current)
## put current values into old so that they will be used in future calculation
a_old = a_current
b_old = b_current
c_old = c_current
d_old = d_current
e_old = e_current
list_cols_final = [str(a_old), str(b_old), str(c_old), str(d_old), str(e_old)]
result_string = "@@".join(list_cols_final)
return result_string
windowSpec = Window.orderBy(F.col("Steps").asc()).rowsBetween(Window.unboundedPreceding, 0)
dataframe = dataframe.withColumn("all_cols_array", F.array(["A", "B", "C", "D", "E", "F", "G" ]))
calculated_columns_df = dataframe.withColumn("all_rows_calculated", multiply_aggregation(F.col("all_cols_array")).over(windowSpec))
print("Calculated Dataframe : Window Aggregation Function Applied")
calculated_columns_df.show(n=100, truncate=False)
split_df = calculated_columns_df.withColumn("splitted_values", F.split("all_rows_calculated", "@@"))
split_df = split_df.withColumn("A", F.col("splitted_values").getItem(0).cast("float"))
split_df = split_df.withColumn("B", F.col("splitted_values").getItem(1).cast("float"))
split_df = split_df.withColumn("C", F.col("splitted_values").getItem(2).cast("float"))
split_df = split_df.withColumn("D", F.col("splitted_values").getItem(3).cast("float"))
split_df = split_df.withColumn("E", F.col("splitted_values").getItem(4).cast("float"))
split_df = split_df.drop(*["all_rows_calculated", "all_cols_array", "splitted_values"])
print("Final Calculated Values")
split_df.show(n=100, truncate=False)
Output :
Given initial pyspark pandas dataframe
DataFrame[A: double, B: double, C: double, D: double, E: double, F: bigint, G: bigint, Product: string, Steps: bigint]
root
|-- A: double (nullable = true)
|-- B: double (nullable = true)
|-- C: double (nullable = true)
|-- D: double (nullable = true)
|-- E: double (nullable = true)
|-- F: long (nullable = true)
|-- G: long (nullable = true)
|-- Product: string (nullable = true)
|-- Steps: long (nullable = true)
Calculated Dataframe : Window Aggregation Function Applied
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|A |B |C |D |E |F |G |Product|Steps|all_cols_array |all_rows_calculated |
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|0.0|1000.0|0.0|0.0|0.0|15 |62 |ITEM147|1 |[0.0, 1000.0, 0.0, 0.0, 0.0, 15.0, 62.0]|0.0@@1000.0@@5050.0@@5005.0@@5000.5 |
|0.0|0.0 |0.0|0.0|0.0|75 |77 |ITEM147|2 |[0.0, 0.0, 0.0, 0.0, 0.0, 75.0, 77.0] |7679.75@@8179.75@@40948.75@@40903.75@@40899.25 |
|0.0|0.0 |0.0|0.0|0.0|55 |20 |ITEM147|3 |[0.0, 0.0, 0.0, 0.0, 0.0, 55.0, 20.0] |61450.875@@65540.75@@327753.75@@327708.75@@327704.25 |
|0.0|0.0 |0.0|0.0|0.0|14 |81 |ITEM147|4 |[0.0, 0.0, 0.0, 0.0, 0.0, 14.0, 81.0] |491678.375@@524448.75@@2622293.75@@2622248.75@@2622244.25 |
|0.0|0.0 |0.0|0.0|0.0|91 |44 |ITEM147|5 |[0.0, 0.0, 0.0, 0.0, 0.0, 91.0, 44.0] |3933528.375@@4195752.75@@20978813.75@@20978768.75@@20978764.25 |
|0.0|0.0 |0.0|0.0|0.0|19 |86 |ITEM147|6 |[0.0, 0.0, 0.0, 0.0, 0.0, 19.0, 86.0] |31468278.375@@33566154.75@@167830823.75@@167830778.75@@167830774.25 |
|0.0|0.0 |0.0|0.0|0.0|63 |94 |ITEM147|7 |[0.0, 0.0, 0.0, 0.0, 0.0, 63.0, 94.0] |251746345.375@@268529422.75@@1342647163.75@@1342647118.75@@1342647114.25 |
|0.0|0.0 |0.0|0.0|0.0|15 |55 |ITEM147|8 |[0.0, 0.0, 0.0, 0.0, 0.0, 15.0, 55.0] |2013970768.375@@2148235479.75@@10741177448.75@@10741177403.75@@10741177399.25 |
|0.0|0.0 |0.0|0.0|0.0|33 |48 |ITEM147|9 |[0.0, 0.0, 0.0, 0.0, 0.0, 33.0, 48.0] |16111766206.875@@17185883946.75@@85929419783.75@@85929419738.75@@85929419734.25|
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
Final Calculated Values
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|A |B |C |D |E |F |G |Product|Steps|
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|0.0 |1000.0 |5050.0 |5005.0 |5000.5 |15 |62 |ITEM147|1 |
|7679.75 |8179.75 |40948.75 |40903.75 |40899.25 |75 |77 |ITEM147|2 |
|61450.875 |65540.75 |327753.75 |327708.75 |327704.25 |55 |20 |ITEM147|3 |
|491678.38 |524448.75 |2622293.8 |2622248.8 |2622244.2 |14 |81 |ITEM147|4 |
|3933528.5 |4195753.0 |2.0978814E7 |2.0978768E7 |2.0978764E7 |91 |44 |ITEM147|5 |
|3.1468278E7 |3.3566156E7 |1.67830816E8 |1.67830784E8 |1.67830768E8 |19 |86 |ITEM147|6 |
|2.51746352E8 |2.68529408E8 |1.34264717E9 |1.34264717E9 |1.34264717E9 |63 |94 |ITEM147|7 |
|2.01397082E9 |2.14823552E9 |1.07411773E10|1.07411773E10|1.07411773E10|15 |55 |ITEM147|8 |
|1.61117665E10|1.71858842E10|8.5929419E10 |8.5929419E10 |8.5929419E10 |33 |48 |ITEM147|9 |
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
Upvotes: 0
Reputation: 2468
You will have to use Pandas API on Spark to achieve this. But the calculations are dependent in complex ways for previously computed values. So it might be slow. Here's an example solution.
import sys
from pyspark import SparkContext, SQLContext
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from collections.abc import Iterable
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
[0.0, 1000, 0.0, 0.0, 0.0, 15, 62, "ITEM147", "1"], ## 2nd row in Excel
[0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", "2"], ### 3rd row in Excel and so on
[0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", "3"],
[0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", "4"],
[0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", "5"],
[0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", "6"],
[0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", "7"],
[0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", "8"],
[0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", "9"],
]
df1Columns = ["A","B","C","D","E","F","G","Product","Steps"]
pyspark_pandas_dataframe = ps.DataFrame(data=data1, columns=df1Columns)
print("Given initial pyspark pandas dataframe")
print(pyspark_pandas_dataframe)
print("Datatypes of the columns")
print(pyspark_pandas_dataframe.dtypes)
def set_next_C(pdf, kk):
pdf.loc[kk, "C"] = pdf.loc[kk, "B"] * 5 + (100.0/2)
def set_next_D(pdf, kk):
pdf.loc[kk, "D"] = pdf.loc[kk, "B"] * 5 + (10.0/2)
def set_next_E(pdf, kk):
pdf.loc[kk, "E"] = pdf.loc[kk, "B"] * 5 + (1.0/2)
def set_next_A(pdf, kk):
pdf.loc[kk, "A"] = pdf.loc[kk, "F"] + pdf.loc[kk, "G"] + ((pdf.loc[kk-1, "C"] + pdf.loc[kk-1, "D"] + pdf.loc[kk-1, "E"]) / 2.0)
def set_next_B(pdf, kk):
pdf.loc[kk, "B"] = pdf.loc[kk, "A"] + (pdf.loc[kk - 1, "B"] / 2.0)
print("shape / dimensions of dataframe", pyspark_pandas_dataframe.shape)
row_num, col_num = pyspark_pandas_dataframe.shape
print(f"row_num : {row_num}, col_num : {col_num}")
for cc in range(col_num):
if cc == 0 :
set_next_C(pyspark_pandas_dataframe, cc)
set_next_D(pyspark_pandas_dataframe, cc)
set_next_E(pyspark_pandas_dataframe, cc)
continue
set_next_A(pyspark_pandas_dataframe, cc)
set_next_B(pyspark_pandas_dataframe, cc)
set_next_C(pyspark_pandas_dataframe, cc)
set_next_D(pyspark_pandas_dataframe, cc)
set_next_E(pyspark_pandas_dataframe, cc)
print("FINAL Result Dataframe")
print(pyspark_pandas_dataframe)
Output :
Given initial pyspark pandas dataframe
A B C D E F G Product Steps
0 0.0 1000.0 0.0 0.0 0.0 15 62 ITEM147 1
1 0.0 0.0 0.0 0.0 0.0 75 77 ITEM147 2
2 0.0 0.0 0.0 0.0 0.0 55 20 ITEM147 3
3 0.0 0.0 0.0 0.0 0.0 14 81 ITEM147 4
4 0.0 0.0 0.0 0.0 0.0 91 44 ITEM147 5
5 0.0 0.0 0.0 0.0 0.0 19 86 ITEM147 6
6 0.0 0.0 0.0 0.0 0.0 63 94 ITEM147 7
7 0.0 0.0 0.0 0.0 0.0 15 55 ITEM147 8
8 0.0 0.0 0.0 0.0 0.0 33 48 ITEM147 9
Datatypes of the columns
A float64
B float64
C float64
D float64
E float64
F int64
G int64
Product object
Steps object
dtype: object
shape / dimensions of dataframe (9, 9)
row_num : 9, col_num : 9
FINAL Result Dataframe
A B C D E F G Product Steps
0 0.000000e+00 1.000000e+03 5.050000e+03 5.005000e+03 5.000500e+03 15 62 ITEM147 1
1 7.679750e+03 8.179750e+03 4.094875e+04 4.090375e+04 4.089925e+04 75 77 ITEM147 2
2 6.145088e+04 6.554075e+04 3.277538e+05 3.277088e+05 3.277042e+05 55 20 ITEM147 3
3 4.916784e+05 5.244488e+05 2.622294e+06 2.622249e+06 2.622244e+06 14 81 ITEM147 4
4 3.933528e+06 4.195753e+06 2.097881e+07 2.097877e+07 2.097876e+07 91 44 ITEM147 5
5 3.146828e+07 3.356615e+07 1.678308e+08 1.678308e+08 1.678308e+08 19 86 ITEM147 6
6 2.517463e+08 2.685294e+08 1.342647e+09 1.342647e+09 1.342647e+09 63 94 ITEM147 7
7 2.013971e+09 2.148235e+09 1.074118e+10 1.074118e+10 1.074118e+10 15 55 ITEM147 8
8 1.611177e+10 1.718588e+10 8.592942e+10 8.592942e+10 8.592942e+10 33 48 ITEM147 9
Upvotes: 0