lordingtar
lordingtar

Reputation: 1062

Can I convert an incoming stream of data into an array?

I'm trying to learn streaming data and manipulating it with the telecom churn dataset provided here. I've written a method to calculate this in batch:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD, LogisticRegressionWithLBFGS, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
object batchChurn{
   def main(args: Array[String]): Unit = {
    //setting spark context
    val conf = new SparkConf().setAppName("churn")
    val sc = new SparkContext(conf)
    //loading and mapping data into RDD
    val csv = sc.textFile("file://filename.csv")
    val data = csv.map {line =>
    val parts = line.split(",").map(_.trim)
    val stringvec = Array(parts(1)) ++ parts.slice(4,20)
    val label = parts(20).toDouble
    val vec = stringvec.map(_.toDouble)
    LabeledPoint(label, Vectors.dense(vec))
    }
    val splits = data.randomSplit(Array(0.7,0.3))
    val (training, testing) = (splits(0),splits(1))
    val numClasses = 2
    val categoricalFeaturesInfo = Map[Int, Int]()
    val numTrees = 6
    val featureSubsetStrategy = "auto"
    val impurity = "gini"
    val maxDepth = 7
    val maxBins = 32
    val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo,numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
    val labelAndPreds = testing.map {point =>
        val prediction = model.predict(point.features)
        (point.label, prediction)
}
}
}

I've had no problems with this. Now, I looked at the NetworkWordCount example provided on the spark website, and changed the code slightly to see how it would behave.

val ssc = new StreamingContext(sc, Seconds(5))

val lines = ssc.socketTextStream("127.0.0.1", 9999)

val data = lines.flatMap(_.split(","))

My question is: is it possible to convert this DStream to an array which I can input into my analysis code? Currently when I try to convert to Array using val data = lines.flatMap(_.split(",")), it clearly says that:error: value toArray is not a member of org.apache.spark.streaming.dstream.DStream[String]

Upvotes: 1

Views: 2416

Answers (3)

Francois G
Francois G

Reputation: 11985

You cannot put all the elements of a DStream in an array because those elements will keep being read over the wire, and your array would have to be indefinitely extensible.

The adaptation of this decision tree model to a streaming mode, where training and testing data arrives continuously, is not trivial for algorithmical reasons — while the answers mentioning collect are technically correct, they're not the appropriate solution to what you're trying to do.

If you want to run decision trees on a Stream in Spark, you may want to look at Hoeffding trees.

Upvotes: 0

Ajit
Ajit

Reputation: 128

DStream.foreachRDD gives you an RDD[String] for each interval of course, you could collect in an array

  val arr = new ArrayBuffer[String]();
   data.foreachRDD {
    arr ++= _.collect() 

}

Also keep in mind you could end up having way more data than you want in your driver since a DStream can be huge.

To limit the data for your analysis , I would do this way

data.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet

Upvotes: 1

Knows Not Much
Knows Not Much

Reputation: 31586

Your DStream contains many RDDs you can get access to the RDDs using foreachRDD function.

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html#foreachRDD(scala.Function1)

then each RDD can be converted to array using collect function.

this has already been shown here

For each RDD in a DStream how do I convert this to an array or some other typical Java data type?

Upvotes: 1

Related Questions