firdaus
firdaus

Reputation: 541

Using spark's MLLib routines with pandas dataframes

I have a pretty big data set (~20GB) stored on disk as Pandas/PyTables HDFStore, and I want to run random forests and boosted trees on it. Trying to do it on my local system takes forever, so I was thinking of farming it out to a spark cluster I have access to and instead using MLLib routines.

While I have managed to load the pandas dataframe as an spark dataframe, I'm a little confused about how to use this in MLLib routines. I'm not too familiar with MLLib and it seems that it accepts only LabeledPoint data types.

I would appreciate any ideas / pointers / code that explain how to use (pandas or spark) dataframes as input to MLLib algorithms - either directly or indirectly, by converting to supported types.

Thanks.

Upvotes: 0

Views: 1630

Answers (2)

Brij Kishore
Brij Kishore

Reputation: 1

y_train,X_train is in pandas dataframe, to convert that in mllib input data format

  1. convert into numpy array
y_train=np.array(y_train)
X_train=np.array(X_train)
  1. convert in rdd data format
from pyspark.mllib.regression import LabeledPoint
train_data=[]
for i in range(X_train.shape[0]):                                                                   
          train_data.append( LabeledPoint(y_train[i],X_train[i]))
  1. Parallelize it
train_data_rdd=sparkContext.parallelize(train_data)

{for spark context ->
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

spark = SparkSession \
    .builder \
    .getOrCreate()
sparkContext=spark.sparkContext
}

model = GradientBoostedTrees.trainRegressor(train_data_rdd,categoricalFeaturesInfo={}, numIterations=3)

Upvotes: -1

Chris
Chris

Reputation: 507

You need to convert the DataFrame to an RDD[LabeledPoint]. Note a LabeledPoint is just a (label: Double, features: Vector). Consider a mapping routine that grabs values from each row:

val rdd = df.map { row =>
  new LabeledPoint(row(0), DenseVector(row.getDouble(1),..., row.getDouble(n)))
}

This will return an RDD[LabeledPoint] which you can input into a RandomForest.trainRegressor(...), for example. Have a look at the DataFrame API for details.

Upvotes: 1

Related Questions