prince13i
prince13i

Reputation: 1

Pyspark calculate new rows based on previous rows from current and other multiple columns

I have an Excel sheet of the formula I need to convert into Pyspark code

considering columns A, B, C, D, E, F, G, H and I where columns F, G, H and I have fixed random numeric values.

Column A has the first row as NULL and subsequent rows follow the formula as "=F3+G3+((C2+D2+E2)/2)".

Column B has the first row as 1000 and subsequent rows follow the formula as "=A3+(B2/2)".

column C follows the formula as "=$B2*5+(100/2)".

column D follows the formula as "=$B2*5+(10/2)".

Column E follows the formula as "=$B2*5+(1/2)".

Screenshot of the Excel data

could you write me a Pyspark code for the same?

Upvotes: 0

Views: 295

Answers (2)

user238607
user238607

Reputation: 2468

This is pyspark solution using pandas_udf. Not sure about the performance.

import sys
from pyspark import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.types import *


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

    [0.0, 1000.0, 0.0, 0.0, 0.0, 15, 62, "ITEM147", 1], ## 2nd row in Excel
    [0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", 2], ### 3rd row in Excel and so on
    [0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", 3],
    [0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", 4],
    [0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", 5],
    [0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", 6],
    [0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", 7],
    [0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", 8],
    [0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", 9],

]

df1Columns = ["A", "B", "C", "D", "E", "F", "G", "Product", "Steps"]

dataframe = sqlContext.createDataFrame(data=data1, schema=df1Columns)
print("Given initial pyspark pandas dataframe")
print(dataframe)

dataframe.printSchema()



def set_next_C(curr_b):
    return curr_b * 5 + (100.0/2)

def set_next_D(curr_b):
    return curr_b * 5 + (10.0/2)

def set_next_E(curr_b):
    return curr_b * 5 + (1.0/2)

def set_next_A(curr_f, curr_g, old_c, old_d, old_e):
     return curr_f + curr_g +  ((old_c + old_d + old_e) / 2.0)

def set_next_B(curr_a, old_b):
    return curr_a + (old_b / 2.0)


@F.pandas_udf("string")
def multiply_aggregation( all_cols_series: pd.Series ) -> str :

    ### Initialization
    a_old = all_cols_series[0][0]
    b_old = all_cols_series[0][1]

    c_old = None
    d_old = None
    e_old = None

    f_old = all_cols_series[0][5]
    g_old = all_cols_series[0][6]


    ## Current will be updated as we go along. At first, they will be initial values
    a_current = None
    b_current = None
    c_current = None
    d_current = None
    e_current = None

    length_series = all_cols_series.size


    for cc in range(length_series):

        if cc == 0:
            ## First row is being initialized
            c_old =  set_next_C(b_old)
            d_old =  set_next_D(b_old)
            e_old =  set_next_E(b_old)
            continue


        ### calculate all current values
        f_current = all_cols_series[cc][5]
        g_current = all_cols_series[cc][6]
        a_current = set_next_A(f_current, g_current, c_old, d_old, e_old)
        b_current = set_next_B(a_current, b_old)
        c_current = set_next_C(b_current)
        d_current = set_next_D(b_current)
        e_current = set_next_E(b_current)

        ## put current values into old so that they will be used in future calculation
        a_old = a_current
        b_old = b_current
        c_old = c_current
        d_old = d_current
        e_old = e_current


    list_cols_final = [str(a_old), str(b_old), str(c_old), str(d_old), str(e_old)]
    result_string = "@@".join(list_cols_final)


    return result_string




windowSpec = Window.orderBy(F.col("Steps").asc()).rowsBetween(Window.unboundedPreceding, 0)

dataframe = dataframe.withColumn("all_cols_array", F.array(["A", "B", "C", "D", "E", "F", "G" ]))

calculated_columns_df = dataframe.withColumn("all_rows_calculated", multiply_aggregation(F.col("all_cols_array")).over(windowSpec))
print("Calculated Dataframe : Window Aggregation Function Applied")
calculated_columns_df.show(n=100, truncate=False)


split_df = calculated_columns_df.withColumn("splitted_values", F.split("all_rows_calculated", "@@"))

split_df = split_df.withColumn("A", F.col("splitted_values").getItem(0).cast("float"))
split_df = split_df.withColumn("B", F.col("splitted_values").getItem(1).cast("float"))
split_df = split_df.withColumn("C", F.col("splitted_values").getItem(2).cast("float"))
split_df = split_df.withColumn("D", F.col("splitted_values").getItem(3).cast("float"))
split_df = split_df.withColumn("E", F.col("splitted_values").getItem(4).cast("float"))

split_df = split_df.drop(*["all_rows_calculated", "all_cols_array", "splitted_values"])

print("Final Calculated Values")
split_df.show(n=100, truncate=False)

Output :

Given initial pyspark pandas dataframe
DataFrame[A: double, B: double, C: double, D: double, E: double, F: bigint, G: bigint, Product: string, Steps: bigint]
root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)
 |-- C: double (nullable = true)
 |-- D: double (nullable = true)
 |-- E: double (nullable = true)
 |-- F: long (nullable = true)
 |-- G: long (nullable = true)
 |-- Product: string (nullable = true)
 |-- Steps: long (nullable = true)

Calculated Dataframe : Window Aggregation Function Applied

+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|A  |B     |C  |D  |E  |F  |G  |Product|Steps|all_cols_array                          |all_rows_calculated                                                            |
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+
|0.0|1000.0|0.0|0.0|0.0|15 |62 |ITEM147|1    |[0.0, 1000.0, 0.0, 0.0, 0.0, 15.0, 62.0]|0.0@@1000.0@@5050.0@@5005.0@@5000.5                                            |
|0.0|0.0   |0.0|0.0|0.0|75 |77 |ITEM147|2    |[0.0, 0.0, 0.0, 0.0, 0.0, 75.0, 77.0]   |7679.75@@8179.75@@40948.75@@40903.75@@40899.25                                 |
|0.0|0.0   |0.0|0.0|0.0|55 |20 |ITEM147|3    |[0.0, 0.0, 0.0, 0.0, 0.0, 55.0, 20.0]   |61450.875@@65540.75@@327753.75@@327708.75@@327704.25                           |
|0.0|0.0   |0.0|0.0|0.0|14 |81 |ITEM147|4    |[0.0, 0.0, 0.0, 0.0, 0.0, 14.0, 81.0]   |491678.375@@524448.75@@2622293.75@@2622248.75@@2622244.25                      |
|0.0|0.0   |0.0|0.0|0.0|91 |44 |ITEM147|5    |[0.0, 0.0, 0.0, 0.0, 0.0, 91.0, 44.0]   |3933528.375@@4195752.75@@20978813.75@@20978768.75@@20978764.25                 |
|0.0|0.0   |0.0|0.0|0.0|19 |86 |ITEM147|6    |[0.0, 0.0, 0.0, 0.0, 0.0, 19.0, 86.0]   |31468278.375@@33566154.75@@167830823.75@@167830778.75@@167830774.25            |
|0.0|0.0   |0.0|0.0|0.0|63 |94 |ITEM147|7    |[0.0, 0.0, 0.0, 0.0, 0.0, 63.0, 94.0]   |251746345.375@@268529422.75@@1342647163.75@@1342647118.75@@1342647114.25       |
|0.0|0.0   |0.0|0.0|0.0|15 |55 |ITEM147|8    |[0.0, 0.0, 0.0, 0.0, 0.0, 15.0, 55.0]   |2013970768.375@@2148235479.75@@10741177448.75@@10741177403.75@@10741177399.25  |
|0.0|0.0   |0.0|0.0|0.0|33 |48 |ITEM147|9    |[0.0, 0.0, 0.0, 0.0, 0.0, 33.0, 48.0]   |16111766206.875@@17185883946.75@@85929419783.75@@85929419738.75@@85929419734.25|
+---+------+---+---+---+---+---+-------+-----+----------------------------------------+-------------------------------------------------------------------------------+


Final Calculated Values

+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|A            |B            |C            |D            |E            |F  |G  |Product|Steps|
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+
|0.0          |1000.0       |5050.0       |5005.0       |5000.5       |15 |62 |ITEM147|1    |
|7679.75      |8179.75      |40948.75     |40903.75     |40899.25     |75 |77 |ITEM147|2    |
|61450.875    |65540.75     |327753.75    |327708.75    |327704.25    |55 |20 |ITEM147|3    |
|491678.38    |524448.75    |2622293.8    |2622248.8    |2622244.2    |14 |81 |ITEM147|4    |
|3933528.5    |4195753.0    |2.0978814E7  |2.0978768E7  |2.0978764E7  |91 |44 |ITEM147|5    |
|3.1468278E7  |3.3566156E7  |1.67830816E8 |1.67830784E8 |1.67830768E8 |19 |86 |ITEM147|6    |
|2.51746352E8 |2.68529408E8 |1.34264717E9 |1.34264717E9 |1.34264717E9 |63 |94 |ITEM147|7    |
|2.01397082E9 |2.14823552E9 |1.07411773E10|1.07411773E10|1.07411773E10|15 |55 |ITEM147|8    |
|1.61117665E10|1.71858842E10|8.5929419E10 |8.5929419E10 |8.5929419E10 |33 |48 |ITEM147|9    |
+-------------+-------------+-------------+-------------+-------------+---+---+-------+-----+

Upvotes: 0

user238607
user238607

Reputation: 2468

You will have to use Pandas API on Spark to achieve this. But the calculations are dependent in complex ways for previously computed values. So it might be slow. Here's an example solution.

import sys
from pyspark import SparkContext, SQLContext
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from collections.abc import Iterable



sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [

    [0.0, 1000, 0.0, 0.0, 0.0, 15, 62, "ITEM147", "1"], ## 2nd row in Excel
    [0.0, 0.0, 0.0, 0.0, 0.0, 75, 77, "ITEM147", "2"], ### 3rd row in Excel and so on
    [0.0, 0.0, 0.0, 0.0, 0.0, 55, 20, "ITEM147", "3"],
    [0.0, 0.0, 0.0, 0.0, 0.0, 14, 81, "ITEM147", "4"],
    [0.0, 0.0, 0.0, 0.0, 0.0, 91, 44, "ITEM147", "5"],
    [0.0, 0.0, 0.0, 0.0, 0.0, 19, 86, "ITEM147", "6"],
    [0.0, 0.0, 0.0, 0.0, 0.0, 63, 94, "ITEM147", "7"],
    [0.0, 0.0, 0.0, 0.0, 0.0, 15, 55, "ITEM147", "8"],
    [0.0, 0.0, 0.0, 0.0, 0.0, 33, 48, "ITEM147", "9"],

]

df1Columns = ["A","B","C","D","E","F","G","Product","Steps"]

pyspark_pandas_dataframe = ps.DataFrame(data=data1, columns=df1Columns)
print("Given initial pyspark pandas dataframe")
print(pyspark_pandas_dataframe)

print("Datatypes of the columns")
print(pyspark_pandas_dataframe.dtypes)


def set_next_C(pdf, kk):
    pdf.loc[kk, "C"] = pdf.loc[kk, "B"] * 5 + (100.0/2)

def set_next_D(pdf, kk):
    pdf.loc[kk, "D"] = pdf.loc[kk, "B"] * 5 + (10.0/2)

def set_next_E(pdf, kk):
    pdf.loc[kk, "E"] = pdf.loc[kk, "B"] * 5 + (1.0/2)

def set_next_A(pdf, kk):
     pdf.loc[kk, "A"] = pdf.loc[kk, "F"] + pdf.loc[kk, "G"] +  ((pdf.loc[kk-1, "C"] + pdf.loc[kk-1, "D"] + pdf.loc[kk-1, "E"]) / 2.0)

def set_next_B(pdf, kk):
    pdf.loc[kk, "B"] =  pdf.loc[kk, "A"] + (pdf.loc[kk - 1, "B"] / 2.0)


print("shape / dimensions of dataframe", pyspark_pandas_dataframe.shape)

row_num, col_num = pyspark_pandas_dataframe.shape

print(f"row_num : {row_num}, col_num : {col_num}")

for cc in range(col_num):

    if cc == 0 :
        set_next_C(pyspark_pandas_dataframe, cc)
        set_next_D(pyspark_pandas_dataframe, cc)
        set_next_E(pyspark_pandas_dataframe, cc)
        continue

    set_next_A(pyspark_pandas_dataframe, cc)
    set_next_B(pyspark_pandas_dataframe, cc)
    set_next_C(pyspark_pandas_dataframe, cc)
    set_next_D(pyspark_pandas_dataframe, cc)
    set_next_E(pyspark_pandas_dataframe, cc)


print("FINAL Result Dataframe")
print(pyspark_pandas_dataframe)

Output :

Given initial pyspark pandas dataframe
     A       B    C    D    E   F   G  Product Steps
0  0.0  1000.0  0.0  0.0  0.0  15  62  ITEM147     1
1  0.0     0.0  0.0  0.0  0.0  75  77  ITEM147     2
2  0.0     0.0  0.0  0.0  0.0  55  20  ITEM147     3
3  0.0     0.0  0.0  0.0  0.0  14  81  ITEM147     4
4  0.0     0.0  0.0  0.0  0.0  91  44  ITEM147     5
5  0.0     0.0  0.0  0.0  0.0  19  86  ITEM147     6
6  0.0     0.0  0.0  0.0  0.0  63  94  ITEM147     7
7  0.0     0.0  0.0  0.0  0.0  15  55  ITEM147     8
8  0.0     0.0  0.0  0.0  0.0  33  48  ITEM147     9
Datatypes of the columns
A          float64
B          float64
C          float64
D          float64
E          float64
F            int64
G            int64
Product     object
Steps       object
dtype: object
shape / dimensions of dataframe (9, 9)
row_num : 9, col_num : 9
FINAL Result Dataframe
              A             B             C             D             E   F   G  Product Steps
0  0.000000e+00  1.000000e+03  5.050000e+03  5.005000e+03  5.000500e+03  15  62  ITEM147     1
1  7.679750e+03  8.179750e+03  4.094875e+04  4.090375e+04  4.089925e+04  75  77  ITEM147     2
2  6.145088e+04  6.554075e+04  3.277538e+05  3.277088e+05  3.277042e+05  55  20  ITEM147     3
3  4.916784e+05  5.244488e+05  2.622294e+06  2.622249e+06  2.622244e+06  14  81  ITEM147     4
4  3.933528e+06  4.195753e+06  2.097881e+07  2.097877e+07  2.097876e+07  91  44  ITEM147     5
5  3.146828e+07  3.356615e+07  1.678308e+08  1.678308e+08  1.678308e+08  19  86  ITEM147     6
6  2.517463e+08  2.685294e+08  1.342647e+09  1.342647e+09  1.342647e+09  63  94  ITEM147     7
7  2.013971e+09  2.148235e+09  1.074118e+10  1.074118e+10  1.074118e+10  15  55  ITEM147     8
8  1.611177e+10  1.718588e+10  8.592942e+10  8.592942e+10  8.592942e+10  33  48  ITEM147     9

Upvotes: 0

Related Questions