Laure D
Laure D

Reputation: 887

Use dataframes for Decision tree classifier in spark with string fields

I have managed to get my Decision Tree classifier work for the RDD-based API, but now I am trying to switch to the Dataframes-based API in Spark.

I have a dataset like this (but with many more fields) :

country, destination, duration, label

Belgium, France, 10, 0
Bosnia, USA, 120, 1
Germany, Spain, 30, 0

First I load my csv file in a dataframe :

val data = session.read
  .format("org.apache.spark.csv")
  .option("header", "true")
  .csv("/home/Datasets/data/dataset.csv")

Then I transform string columns into numerical columns

val stringColumns = Array("country", "destination")

val index_transformers = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

Then I assemble all my features into one single vector, using VectorAssembler, like this :

val assembler = new VectorAssembler()
   .setInputCols(Array("country_index", "destination_index", "duration_index"))
   .setOutputCol("features")

I split my data into training and test :

val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

Then I create my DecisionTree Classifier

val dt = new DecisionTreeClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

Then I use a pipeline to make all the transformations

val pipeline = new Pipeline()
  .setStages(Array(index_transformers, assembler, dt))

I train my model and use it for predictions :

val model = pipeline.fit(trainingData)

val predictions = model.transform(testData)

But I get some mistakes I don't understand :

When I run my code like that, I have this error :

[error]  found   : Array[org.apache.spark.ml.feature.StringIndexer]
[error]  required: org.apache.spark.ml.PipelineStage
[error]           .setStages(Array(index_transformers, assembler,dt))

So what I did is that I added a pipeline right after the index_transformers val, and right before val assembler :

val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(data)
val df_indexed = index_model.transform(data)

and I use as training set and testing set, my new df_indexed dataframe, and I removed index_transformers from my pipeline with assembler and dt

val Array(trainingData, testData) = df_indexed.randomSplit(Array(0.7, 0.3))

val pipeline = new Pipeline()
  .setStages(Array(assembler,dt))

And I get this error :

Exception in thread "main" java.lang.IllegalArgumentException: Data type StringType is not supported.

It basically says I use VectorAssembler on String, whereas I told it to use it on df_indexed which has now a numerical column_index, but it doesn't seem to use it in vectorAssembler, and i just don't understand..

Thank you

EDIT

Now I have almost managed to get it work :

val data = session.read
  .format("org.apache.spark.csv")
  .option("header", "true")
  .csv("/home/hvfd8529/Datasets/dataOINIS/dataset.csv")

val stringColumns = Array("country_index", "destination_index", "duration_index")

val stringColumns_index = stringColumns.map(c => s"${c}_index")

val index_transformers = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

val assembler  = new VectorAssembler()
    .setInputCols(stringColumns_index)
    .setOutputCol("features")

val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")

val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("features")
  .setImpurity("entropy")
  .setMaxBins(1000)
  .setMaxDepth(15)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels())

val stages = index_transformers :+ assembler :+ labelIndexer :+ dt :+ labelConverter

val pipeline = new Pipeline()
  .setStages(stages)


// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "indexedFeatures").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("accuracy = " + accuracy)

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)

except that now I have an error saying this :

value labels is not a member of org.apache.spark.ml.feature.StringIndexer

and I don't understand, as I am following examples on spark doc :/

Upvotes: 2

Views: 2249

Answers (2)

Laure D
Laure D

Reputation: 887

What I did for my first problem :

val stages = index_transformers :+ assembler :+ labelIndexer :+ rf :+ labelConverter

val pipeline = new Pipeline()
  .setStages(stages)

For my second issue with label, I needed to use .fit(data) like this

val labelIndexer = new StringIndexer()
  .setInputCol("label_fraude")
  .setOutputCol("indexedLabel")
  .fit(data)

Upvotes: 0

1e12d9j0j
1e12d9j0j

Reputation: 21

Should be:

val pipeline = new Pipeline()
  .setStages(index_transformers ++ Array(assembler, dt): Array[PipelineStage])

Upvotes: 0

Related Questions