Reputation: 7326
I am building a training set using two text files representing documents and labels.
Documents.txt
hello world
hello mars
Labels.txt
0
1
I have read in these files and converted my document data to a tf-idf
weighted term-document matrix
which is represented as a RDD[Vector]
. I have also read-in and created a RDD[Vector]
for my labels:
val docs: RDD[Seq[String]] = sc.textFile("Documents.txt").map(_.split(" ").toSeq)
val labs: RDD[Vector] = sc.textFile("Labels.txt")
.map(s => Vectors.dense(s.split(',').map(_.toDouble)))
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(docs)
tf.cache()
val idf = new IDF(minDocFreq = 3).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
I would like to use tfidf
and labs
to create a RDD[LabeledPoint]
, but I am not sure how to apply a mapping with two different RDDs
. Is this even possible/efficient, or do I need to rethink my approach?
Upvotes: 0
Views: 316
Reputation: 330173
One way to handle this is to join
based on indices:
import org.apache.spark.RangePartitioner
// Add indices
val idfIndexed = idf.zipWithIndex.map(_.swap)
val labelsIndexed = labels.zipWithIndex.map(_.swap)
// Create range partitioner on larger RDD
val partitioner = new RangePartitioner(idfIndexed.partitions.size, idfIndexed)
// Join with custom partitioner
labelsIndexed.join(idfIndexed, partitioner).values
Upvotes: 2