Stefan Falk
Stefan Falk

Reputation: 25397

How to write Iterator[String] result from mapPartitions into one file?

I am new to Spark and Scala that is why I am having quite a hard time to get through this.

What I intend to do is to pre-process my data with Stanford CoreNLP using Spark. I understand that I have to use mapPartitions in order to have one StanfordCoreNLP instance per partition as suggested in this thread. However, I lack of knowledge/understanding how to proceed from here.

In the end I want to train word vectors on this data but for now I would be happy to find out how I can get my processed data from here and write it into another file.

This is what I got so far:

import java.util.Properties

import com.google.gson.Gson
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.util.CoreMap
import masterthesis.code.wordvectors.Review
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConversions._

object ReviewPreprocessing {

  def main(args: Array[String]) {

    val resourceUrl = getClass.getResource("amazon-reviews/reviews_Electronics.json")
    val file = sc.textFile(resourceUrl.getPath)

    val linesPerPartition = file.mapPartitions( lineIterator => {

      val props = new Properties()
      props.put("annotators", "tokenize, ssplit, pos, lemma")

      val sentencesAsTextList : List[String] = List()
      val pipeline = new StanfordCoreNLP(props)
      val gson = new Gson()

      while(lineIterator.hasNext) {

        val line = lineIterator.next
        val review = gson.fromJson(line, classOf[Review])
        val doc = new Annotation(review.getReviewText)

        pipeline.annotate(doc)

        val sentences : java.util.List[CoreMap] = doc.get(classOf[SentencesAnnotation])
        val sb = new StringBuilder();

        sentences.foreach( sentence => {
          val tokens = sentence.get(classOf[TokensAnnotation])
          tokens.foreach( token => {
            sb.append(token.get(classOf[LemmaAnnotation]))
            sb.append(" ")
          })
        })
        sb.setLength(sb.length - 1)
        sentencesAsTextList.add(sb.toString)
      }

      sentencesAsTextList.iterator
    })       

    System.exit(0)
  }

}

How would I e.g. write this result into one single file? The ordering does not matter here - I guess the ordering is lost at this point anyway.

Upvotes: 3

Views: 1965

Answers (1)

evgenii
evgenii

Reputation: 1235

In case you'd use saveAsTextFile right on your RDD, you'd end up having as many output files as many partitions you have. In order to have just one you can either coalesce everything into one partition like

sc.textFile("/path/to/file")
  .mapPartitions(someFunc())
  .coalesce(1)
  .saveAsTextFile("/path/to/another/file")

Or (just for fun) you could get all partitions to driver one by one and save all data yourself.

val it = sc.textFile("/path/to/file")
  .mapPartitions(someFunc())
  .toLocalIterator

while(it.hasNext) {
  writeToFile(it.next())
}

Upvotes: 1

Related Questions