Reputation: 25397
I am new to Spark and Scala that is why I am having quite a hard time to get through this.
What I intend to do is to pre-process my data with Stanford CoreNLP using Spark. I understand that I have to use mapPartitions
in order to have one StanfordCoreNLP
instance per partition as suggested in this thread. However, I lack of knowledge/understanding how to proceed from here.
In the end I want to train word vectors on this data but for now I would be happy to find out how I can get my processed data from here and write it into another file.
This is what I got so far:
import java.util.Properties
import com.google.gson.Gson
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.util.CoreMap
import masterthesis.code.wordvectors.Review
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
object ReviewPreprocessing {
def main(args: Array[String]) {
val resourceUrl = getClass.getResource("amazon-reviews/reviews_Electronics.json")
val file = sc.textFile(resourceUrl.getPath)
val linesPerPartition = file.mapPartitions( lineIterator => {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
val sentencesAsTextList : List[String] = List()
val pipeline = new StanfordCoreNLP(props)
val gson = new Gson()
while(lineIterator.hasNext) {
val line = lineIterator.next
val review = gson.fromJson(line, classOf[Review])
val doc = new Annotation(review.getReviewText)
pipeline.annotate(doc)
val sentences : java.util.List[CoreMap] = doc.get(classOf[SentencesAnnotation])
val sb = new StringBuilder();
sentences.foreach( sentence => {
val tokens = sentence.get(classOf[TokensAnnotation])
tokens.foreach( token => {
sb.append(token.get(classOf[LemmaAnnotation]))
sb.append(" ")
})
})
sb.setLength(sb.length - 1)
sentencesAsTextList.add(sb.toString)
}
sentencesAsTextList.iterator
})
System.exit(0)
}
}
How would I e.g. write this result into one single file? The ordering does not matter here - I guess the ordering is lost at this point anyway.
Upvotes: 3
Views: 1965
Reputation: 1235
In case you'd use saveAsTextFile
right on your RDD
, you'd end up having as many output files as many partitions you have. In order to have just one you can either coalesce everything into one partition like
sc.textFile("/path/to/file")
.mapPartitions(someFunc())
.coalesce(1)
.saveAsTextFile("/path/to/another/file")
Or (just for fun) you could get all partitions to driver one by one and save all data yourself.
val it = sc.textFile("/path/to/file")
.mapPartitions(someFunc())
.toLocalIterator
while(it.hasNext) {
writeToFile(it.next())
}
Upvotes: 1