Reputation: 3373
I am performing a naive Bayes classification in Spark/Scala. It seems to work OK, the code is:
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.StringIndexer
val dfLemma2 = dfLemma.withColumn("racist", 'racist.cast("String"))
val indexer = new StringIndexer().setInputCol("racist").setOutputCol("indexracist")
val indexed = indexer.fit(dfLemma2).transform(dfLemma2)
indexed.show()
val hashingTF = new HashingTF()
.setInputCol("lemma").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(indexed)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "indexracist").take(3).foreach(println)
val changedTypedf = rescaledData.withColumn("indexracist", 'indexracist.cast("double"))
changedTypedf.show()
// val labeled = changedTypedf.map(row => LabeledPoint(row(0), row.getAs[Vector](4)))
val labeled = changedTypedf.select("indexracist","features").rdd.map(row => LabeledPoint(
row.getAs[Double]("indexracist"),
org.apache.spark.mllib.linalg.Vectors.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"))
))
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils
// Split data into training (60%) and test (40%).
val Array(training, test) = labeled.randomSplit(Array(0.6, 0.4))
val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
predictionAndLabel.take(100)
This outputs:
res330: Array[(Double, Double)] = Array((0.0,0.0), (0.0,0.0), (0.0,0.0), (0.0,0.0),
which I assume is an array of (prediction, label) pairs. What I would like to output is these pairs joined to the original text, which was a column called lemma in training dataframe, so something like:
--------------------------------------------------
| Prediction | Label | lemma |
--------------------------------------------------
| 0.0 | 0.0 |[cakes, are, good] |
| 0.0 | 0.0 |[jim, says, hi] |
| 1.0 | 1.0 |[shut, the, dam, door]|
...
--------------------------------------------------
Any pointers are appreciated as my Spark/Scala is weak.
EDIT, The text column is called 'lemma' in 'indexed':
+------+-------------------------------------------------------------------------------------------------------------------+
|racist|lemma |
+------+-------------------------------------------------------------------------------------------------------------------+
|true |[@cllrwood, abbo, @ukip, britainfirst] |
|false |[objectofthemonth, george, lansbury, bust, jussuf, abbo, amp, fascinating, insight, son, jerome] |
|false |[nowplay, one, night, stand, van, brave, @bbraveofficial, bbravesquad, abbo, safe] |
|false |[@mahesh, weet, son, satyamurthy, kante, abbo, chana, better, aaamovie] |
Upvotes: 3
Views: 3932
Reputation: 1
We have to use pipeline to get the training columns other than prediction columns
Upvotes: -2
Reputation: 2434
As some answers said : It's recommended that you use ml
package instead of mllib
package since spark 2.0
After rewriting your code using ml
package, the answer of your question will be very straightforward : just selecting the right columns should answer your needs
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
val dfLemma2 = dfLemma.withColumn("racist", 'racist.cast("String"))
val indexer = new StringIndexer().setInputCol("racist").setOutputCol("indexracist")
val hashingTF = new HashingTF()
.setInputCol("lemma")
.setOutputCol("rawFeatures")
.setNumFeatures(20)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val naiveBayes =
new NaiveBayes().setLabelCol("indexracist").setFeaturesCol("features").setModelType("multinomial").setSmoothing(1.0)
val pipeline = new Pipeline().setStages(Array(indexer, hashingTF, idf, naiveBayes))
val Array(training, test) = dfLemma2.randomSplit(Array(0.6, 0.4))
val model = pipeline.fit(training)
val predictionAndLabel = model.transform(test).select('Prediction, 'racist, 'indexracist, 'lemma)
predictionAndLabel.take(100)
Hope it helps, otherwise comment your problems
Upvotes: 1
Reputation: 2618
While selecting the output columns to be displayed, try to include the "lemma" column also so that it will get written along with label and features columns.
For more details please refer How to create correct data frame for classification in Spark ML. This post is little bit similar to your question and check whether it helps
Upvotes: 0
Reputation: 1099
Try using ml package instead of mllib package. for example refer https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.SparkSession
object NaiveBayesExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("NaiveBayesExample")
.getOrCreate()
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.6, 0.4))
// Train a NaiveBayes model.
val model = new NaiveBayes()
.fit(trainingData)
// Select example rows to display.
val predictions = model.transform(testData)
predictions.show()
// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test set accuracy = " + accuracy)
spark.stop()
}
}
Upvotes: 1
Reputation: 40380
You just need to transform your data and show them as followed :
val predictions = model.transform(test)
predictions.show()
Upvotes: 1