Klue
Klue

Reputation: 1357

How to convert Seq to RDD when working with spark streaming context

I am using TestSuiteBase to create some tests with spark-streaming (using spark streaming context scc). Then I create dummy data using output: Seq[Seq[(Double, Double)]]. Finally I want to apply some function to output, but this function accepts RDD[(Double, Double)], not Seq[Seq[(Double, Double)]].

To solve this issue, I am thinking to use val rdd: RDD[(Double, Double)] = sc.parallelize(output.flatten), however how and where exactly should I get spark context sc from scc? Or, perhaps, there is any way to directly create dummy data in RDD without using Seq?

class StreamingTestLR  extends SparkFunSuite
                       with TestSuiteBase {

  // use longer wait time to ensure job completion
  override def maxWaitTimeMillis: Int = 20000

  var ssc: StreamingContext = _

  override def afterFunction() {
    super.afterFunction()
    if (ssc != null) {
      ssc.stop()
    }
  }

//...

val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)

// THE PROBLEM IS HERE!!!
// val metrics = new SomeFuncThatAcceptsRDD(rdd)

}

UPDATE

  // Test if the prediction accuracy of increases when using hyper-parameter optimization
  // in order to learn Y = 10*X1 + 10*X2 on streaming data
  test("Test 1") {
    // create model initialized with zero weights
    val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.dense(0.0, 0.0))
      .setStepSize(0.2)
      .setNumIterations(25)

    // generate sequence of simulated data for testing
    val numBatches = 10
    val nPoints = 100
    val testInput = (0 until numBatches).map { i =>
      LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1))
    }
    val inputDStream = DStream[LabeledPoint]

    withStreamingContext(setupStreams(testInput, inputDStream)) { ssc =>
      model.trainOn(inputDStream)
      model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
      val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)


      val rdd: RDD[(Double, Double)] = ssc.sparkContext.parallelize(output.flatten)

      // Instantiate metrics object
      val metrics = new RegressionMetrics(rdd)

      // Squared error
      println(s"MSE = ${metrics.meanSquaredError}")
      println(s"RMSE = ${metrics.rootMeanSquaredError}")

      // R-squared
      println(s"R-squared = ${metrics.r2}")

      // Mean absolute error
      println(s"MAE = ${metrics.meanAbsoluteError}")

      // Explained variance
      println(s"Explained variance = ${metrics.explainedVariance}")
    }
  }

Upvotes: 2

Views: 1404

Answers (1)

Vitalii Kotliarenko
Vitalii Kotliarenko

Reputation: 2967

Try this:

 class MyTestSuite extends TestSuiteBase with BeforeAndAfter {

  test("my test") {
    withTestServer(new TestServer()) { testServer =>
      // Start the server
      testServer.start()
      // Set up the streaming context and input streams
      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
        val rdd = ssc.sparkContext.parallelize(output.flatten)
        // your code here 
        testServer.stop()
        ssc.stop()
      }
     }
    }
 }

More details here: https://github.com/apache/spark/blob/master/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Upvotes: 3

Related Questions