Reputation: 1062
I'm trying to learn streaming data and manipulating it with the telecom churn dataset provided here. I've written a method to calculate this in batch:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD, LogisticRegressionWithLBFGS, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
object batchChurn{
def main(args: Array[String]): Unit = {
//setting spark context
val conf = new SparkConf().setAppName("churn")
val sc = new SparkContext(conf)
//loading and mapping data into RDD
val csv = sc.textFile("file://filename.csv")
val data = csv.map {line =>
val parts = line.split(",").map(_.trim)
val stringvec = Array(parts(1)) ++ parts.slice(4,20)
val label = parts(20).toDouble
val vec = stringvec.map(_.toDouble)
LabeledPoint(label, Vectors.dense(vec))
}
val splits = data.randomSplit(Array(0.7,0.3))
val (training, testing) = (splits(0),splits(1))
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 6
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 7
val maxBins = 32
val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo,numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val labelAndPreds = testing.map {point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
}
}
I've had no problems with this. Now, I looked at the NetworkWordCount example provided on the spark website, and changed the code slightly to see how it would behave.
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val data = lines.flatMap(_.split(","))
My question is: is it possible to convert this DStream to an array which I can input into my analysis code? Currently when I try to convert to Array using val data = lines.flatMap(_.split(","))
, it clearly says that:error: value toArray is not a member of org.apache.spark.streaming.dstream.DStream[String]
Upvotes: 1
Views: 2416
Reputation: 11985
You cannot put all the elements of a DStream
in an array because those elements will keep being read over the wire, and your array would have to be indefinitely extensible.
The adaptation of this decision tree model to a streaming mode, where training and testing data arrives continuously, is not trivial for algorithmical reasons — while the answers mentioning collect are technically correct, they're not the appropriate solution to what you're trying to do.
If you want to run decision trees on a Stream in Spark, you may want to look at Hoeffding trees.
Upvotes: 0
Reputation: 128
DStream.foreachRDD gives you an RDD[String] for each interval of course, you could collect in an array
val arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
Also keep in mind you could end up having way more data than you want in your driver since a DStream can be huge.
To limit the data for your analysis , I would do this way
data.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
Upvotes: 1
Reputation: 31586
Your DStream contains many RDDs you can get access to the RDDs using foreachRDD
function.
then each RDD can be converted to array using collect
function.
this has already been shown here
For each RDD in a DStream how do I convert this to an array or some other typical Java data type?
Upvotes: 1