Brian
Brian

Reputation: 7326

Spark MLib - Create LabeledPoint from RDD[Vector] features and RDD[Vector] label

I am building a training set using two text files representing documents and labels.

Documents.txt

hello world
hello mars

Labels.txt

0
1

I have read in these files and converted my document data to a tf-idf weighted term-document matrix which is represented as a RDD[Vector]. I have also read-in and created a RDD[Vector] for my labels:

val docs: RDD[Seq[String]] = sc.textFile("Documents.txt").map(_.split(" ").toSeq)
val labs: RDD[Vector] = sc.textFile("Labels.txt")
  .map(s => Vectors.dense(s.split(',').map(_.toDouble)))

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(docs)
tf.cache()

val idf = new IDF(minDocFreq = 3).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

I would like to use tfidf and labsto create a RDD[LabeledPoint], but I am not sure how to apply a mapping with two different RDDs. Is this even possible/efficient, or do I need to rethink my approach?

Upvotes: 0

Views: 316

Answers (1)

zero323
zero323

Reputation: 330173

One way to handle this is to join based on indices:

import org.apache.spark.RangePartitioner

// Add indices
val idfIndexed = idf.zipWithIndex.map(_.swap)
val labelsIndexed = labels.zipWithIndex.map(_.swap)

// Create range partitioner on larger RDD
val partitioner = new RangePartitioner(idfIndexed.partitions.size, idfIndexed)

// Join with custom partitioner
labelsIndexed.join(idfIndexed, partitioner).values

Upvotes: 2

Related Questions