dizzy
dizzy

Reputation: 381

pyspark transform subset of DataFrame cols but preserve index

I am pretty new to spark/pyspark and I am trying to convert some pandas code to pyspark.

The problem in a nutshell is: how can i row wise transform some numeric columns of a spark dataframe while preserving the row index values.

I have a dataframe with several columns serving as the index, while the rest are numerical data I need to do several transforms on

   i0 i1        c0        c1        c2
0   0  A  1.764052 -0.977278  0.144044
1   1  B  0.400157  0.950088  1.454274
2   2  C  0.978738 -0.151357  0.761038
3   3  D  2.240893 -0.103219  0.121675
4   4  E  1.867558  0.410599  0.443863

So column i0 and i1 are the index, and c0 - c2 are the data.

What I would like to do is is apply some transform to the numeric columns (row wise) but keep the index information.

In the following I am using "subtracting the mean row wise" as an example, the actual operations I need to do are varied and require arbitrary functions. I know you do not need to use a function to subtract the mean with spark dataframe, I am just using it here as a simplification.

The setup code is like this:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.master("local").getOrCreate()

def pd_norm(df):
        return df.sub(df.mean(axis=1), axis=0)

def pd_someop(it):
        for df in it:
                yield pd_norm(df)

np.random.seed(0)
nrows = 5
ncols = 3
v = pd.DataFrame({'i0' : range(nrows), 'i1' : [chr(65 + i) for i in range(nrows)]})
v = v.join(pd.DataFrame({'c' + str(x) : np.random.normal(size=nrows) for x in range(ncols)}))
vdf = spark.createDataFrame(v)

I can run something like

vdf.select([F.col(x) for x in vdf.columns[2:]]).mapInPandas(pd_someop, schema=vdf.schema[2:]).show()

Which will apply the transform, but there is no guarantee the rows will be returned in order, so I dont know how to get the transformed values with their index columns values.

I cannot pass the index columns because I do not want them included in the transform calculation. They may be dates, or strings not simple integer index/row numbers.

In pandas I would do something like

v.iloc[:,:2].join(pd_norm(v.iloc[:,2:]))

Which gives

   i0 i1        c0        c1        c2
0   0  A  1.453780 -1.287551 -0.166229
1   1  B -0.534683  0.015249  0.519434
2   2  C  0.449265 -0.680830  0.231565
3   3  D  1.487777 -0.856335 -0.631441
4   4  E  0.960218 -0.496741 -0.463477

I.e I have the transformed numeric columns with the original indexes.

I also have a fair amount of columns (10s to 1000s) so solutions where I explicitly hardcode column names are not feasible.

I am really looking for a generic usage pattern where i have several columns that form an index, and several hundred columns that need to be transformed by some arbitrary row wise function.

I thought about adding column metadata to the spark dataframe indicating if the column was an index or not, but this metadata doesnt make it to the pandas functions so I cant filter the index columns out there.

I hope this is clear. Like I say I am very new to spark so I don't know if I am just missing something obvious. Thanks.

Upvotes: 1

Views: 553

Answers (1)

mck
mck

Reputation: 42422

You can change the yield part as below, and change the way you call mapInPandas:

def pd_norm(df):
        return df.sub(df.mean(axis=1), axis=0)

def pd_someop(it):
        for df in it:
                yield df.iloc[:,:2].join(pd_norm(df.iloc[:,2:]))

vdf.mapInPandas(pd_someop, schema=vdf.schema).show()
+---+---+------------------+--------------------+--------------------+
| i0| i1|                c0|                  c1|                  c2|
+---+---+------------------+--------------------+--------------------+
|  0|  A|1.4537796668836203| -1.2875505589604548|-0.16622910792316567|
|  1|  B|-0.534682502584706|0.015248706573660176|  0.5194337960110459|
|  2|  C| 0.449265150454061|  -0.680830041949376| 0.23156489149531523|
|  3|  D|1.4877767445678818|  -0.856335306427134| -0.6314414381407477|
|  4|  E|0.9602180818720457|-0.49674140633954944| -0.4634766755324961|
+---+---+------------------+--------------------+--------------------+

Upvotes: 1

Related Questions