Reputation: 381
I am pretty new to spark/pyspark and I am trying to convert some pandas code to pyspark.
The problem in a nutshell is: how can i row wise transform some numeric columns of a spark dataframe while preserving the row index values.
I have a dataframe with several columns serving as the index, while the rest are numerical data I need to do several transforms on
i0 i1 c0 c1 c2
0 0 A 1.764052 -0.977278 0.144044
1 1 B 0.400157 0.950088 1.454274
2 2 C 0.978738 -0.151357 0.761038
3 3 D 2.240893 -0.103219 0.121675
4 4 E 1.867558 0.410599 0.443863
So column i0 and i1 are the index, and c0 - c2 are the data.
What I would like to do is is apply some transform to the numeric columns (row wise) but keep the index information.
In the following I am using "subtracting the mean row wise" as an example, the actual operations I need to do are varied and require arbitrary functions. I know you do not need to use a function to subtract the mean with spark dataframe, I am just using it here as a simplification.
The setup code is like this:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.master("local").getOrCreate()
def pd_norm(df):
return df.sub(df.mean(axis=1), axis=0)
def pd_someop(it):
for df in it:
yield pd_norm(df)
np.random.seed(0)
nrows = 5
ncols = 3
v = pd.DataFrame({'i0' : range(nrows), 'i1' : [chr(65 + i) for i in range(nrows)]})
v = v.join(pd.DataFrame({'c' + str(x) : np.random.normal(size=nrows) for x in range(ncols)}))
vdf = spark.createDataFrame(v)
I can run something like
vdf.select([F.col(x) for x in vdf.columns[2:]]).mapInPandas(pd_someop, schema=vdf.schema[2:]).show()
Which will apply the transform, but there is no guarantee the rows will be returned in order, so I dont know how to get the transformed values with their index columns values.
I cannot pass the index columns because I do not want them included in the transform calculation. They may be dates, or strings not simple integer index/row numbers.
In pandas I would do something like
v.iloc[:,:2].join(pd_norm(v.iloc[:,2:]))
Which gives
i0 i1 c0 c1 c2
0 0 A 1.453780 -1.287551 -0.166229
1 1 B -0.534683 0.015249 0.519434
2 2 C 0.449265 -0.680830 0.231565
3 3 D 1.487777 -0.856335 -0.631441
4 4 E 0.960218 -0.496741 -0.463477
I.e I have the transformed numeric columns with the original indexes.
I also have a fair amount of columns (10s to 1000s) so solutions where I explicitly hardcode column names are not feasible.
I am really looking for a generic usage pattern where i have several columns that form an index, and several hundred columns that need to be transformed by some arbitrary row wise function.
I thought about adding column metadata to the spark dataframe indicating if the column was an index or not, but this metadata doesnt make it to the pandas functions so I cant filter the index columns out there.
I hope this is clear. Like I say I am very new to spark so I don't know if I am just missing something obvious. Thanks.
Upvotes: 1
Views: 553
Reputation: 42422
You can change the yield
part as below, and change the way you call mapInPandas
:
def pd_norm(df):
return df.sub(df.mean(axis=1), axis=0)
def pd_someop(it):
for df in it:
yield df.iloc[:,:2].join(pd_norm(df.iloc[:,2:]))
vdf.mapInPandas(pd_someop, schema=vdf.schema).show()
+---+---+------------------+--------------------+--------------------+
| i0| i1| c0| c1| c2|
+---+---+------------------+--------------------+--------------------+
| 0| A|1.4537796668836203| -1.2875505589604548|-0.16622910792316567|
| 1| B|-0.534682502584706|0.015248706573660176| 0.5194337960110459|
| 2| C| 0.449265150454061| -0.680830041949376| 0.23156489149531523|
| 3| D|1.4877767445678818| -0.856335306427134| -0.6314414381407477|
| 4| E|0.9602180818720457|-0.49674140633954944| -0.4634766755324961|
+---+---+------------------+--------------------+--------------------+
Upvotes: 1