Reputation: 2316
I have this code which I had written for Random Forest regression
encoding. But Random Forest regression
does not require One Hot Encoding
after indexer
. Now I want to try the Linear Regression
which requires One Hot Encoding
. I went through the Sparks One Hot Encoder documentation but couldn't get how to incorporate that in my current code. How can I add the One Hot Encoding
step in my current code?
from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
import org.apache.spark.ml.feature.OneHotEncoder
label_col = "x4"
# converting RDD to dataframe
train_data_df = train_data.toDF(("x0","x1","x2","x3","x4"))
# Indexers encode strings with doubles
string_indexers = [
StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
for x in train_data_df.columns if x != label_col
]
# Assembles multiple columns into a single vector
assembler = VectorAssembler(
inputCols=["idx_{0}".format(x) for x in train_data_df.columns if x != label_col],
outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + [assembler])
model = pipeline.fit(train_data_df)
indexed = model.transform(train_data_df)
label_points = (indexed
.select(col(label_col).cast("double").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))
UPDATE:
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
###### FOR TEST DATA ######
label_col_test = "x4"
# converting RDD to dataframe
test_data_df = test_data.toDF(("x0","x1","x2","x3","x4"))
# Indexers encode strings with doubles
string_indexers_test = [
StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
for x in testData_df_1.columns if x != label_col_test
]
# encoders
encoders_test = [
StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
for x in testData_df_1.columns if x != label_col_test
]
# Assembles multiple columns into a single vector
assembler_test = VectorAssembler(
inputCols=["idx_{0}".format(x) for x in testData_df_1.columns if x != label_col_test],
outputCol="features"
)
pipeline_test = Pipeline(stages=string_indexers_test + encoders_test + [assembler_test])
model_test = pipeline_test.fit(test_data_df)
indexed_test = model_test.transform(test_data_df)
label_points_test = (indexed_test
.select(col(label_col_test).cast("float").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))
# Build the model
model = LinearRegressionWithSGD.train(label_points)
valuesAndPreds = label_points_test.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
Upvotes: 2
Views: 7173
Reputation: 330413
You can simply add it as a step between indexing and assembling:
encoders = [
StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
for x in train_data_df.columns if x != label_col
]
assembler = VectorAssembler(
inputCols=[
"enc_{0}".format(x) for x in train_data_df.columns if x != label_col
],
outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + encoders + [assembler])
Upvotes: 2