Reputation: 1357
I need to train a linear regression model on a streaming data. I read streaming data using textFileStream
. But the problem is that RegressionMetrics
accepts RDD[(Double, Double)]
, while output
is in format DStream[Double,Double]
.
How to convert output
into the RDD[(Double, Double)]
to be able to use RegressionMetrics
?
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(0.0, 0.0))
.setStepSize(0.2)
.setNumIterations(25)
trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)
testData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)
model.trainOn(trainingData)
val output = model.predictOnValues(testData.map(lp => (lp.label, lp.features)))
val metrics = new RegressionMetrics(output)
val rmse = metrics.rootMeanSquaredError
Upvotes: 1
Views: 281
Reputation: 4471
Every DStream consists of an underlying RDD (a separate one for every data batch), which can be accessed using the foreachRDD
method:
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).foreachRDD { rdd =>
val metrics = new RegressionMetrics(rdd)
val rmse = metrics.rootMeanSquaredError
// do something with `rmse` here
}
Upvotes: 0