Klue
Klue

Reputation: 1357

Convert DStream[Double,Double] to RDD[(Double, Double)]

I need to train a linear regression model on a streaming data. I read streaming data using textFileStream. But the problem is that RegressionMetrics accepts RDD[(Double, Double)], while output is in format DStream[Double,Double]. How to convert output into the RDD[(Double, Double)] to be able to use RegressionMetrics?

val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.dense(0.0, 0.0))
      .setStepSize(0.2)
      .setNumIterations(25)

    trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)
testData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse)

model.trainOn(trainingData)

val output = model.predictOnValues(testData.map(lp => (lp.label, lp.features)))

val metrics = new RegressionMetrics(output) 
val rmse = metrics.rootMeanSquaredError

Upvotes: 1

Views: 281

Answers (1)

Paweł Jurczenko
Paweł Jurczenko

Reputation: 4471

Every DStream consists of an underlying RDD (a separate one for every data batch), which can be accessed using the foreachRDD method:

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).foreachRDD { rdd =>
  val metrics = new RegressionMetrics(rdd)
  val rmse = metrics.rootMeanSquaredError
  // do something with `rmse` here
}

Upvotes: 0

Related Questions