I'm trying to do sentiment analysis using tweets with Spark MLlib. After pre-processing data and converting it to an appropriate format, I invoke NaiveBayes's train method to obtain a model, but it fails with an exception. Here is the stacktrace:
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$
at scala.collection.Iterator$$anon$
at scala.collection.IndexedSeqLike$
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:108)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:120)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:108)
at org.apache.spark.mllib.classification.NaiveBayes$.train(NaiveBayes.scala:467)
at org.jc.sparknaivebayes.main.NaiveBayesTrain$delayedInit$body.apply(NaiveBayesTrain.scala:53)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at org.jc.sparknaivebayes.main.NaiveBayesTrain$.main(NaiveBayesTrain.scala:12)
at org.jc.sparknaivebayes.main.NaiveBayesTrain.main(NaiveBayesTrain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$
This is my main method:
val csvFiles = args(0).split(",")
val modelStore = args(1)
val docs = TweetParser.parseAll(csvFiles, sc)
val termDocs = Tokenizer.tokenizeAll(docs)
val termDocsRdd = sc.parallelize[TermDoc](termDocs.toSeq)
val numDocs = termDocsRdd.count()
//val terms = termDocsRdd.flatMap(_.terms).distinct().collect().sortBy(identity)
val terms = termDocsRdd.flatMap(_.terms).distinct().sortBy(identity)
val termDict = new Dictionary(terms)
//val labels = termDocsRdd.flatMap(_.labels).distinct().collect()
val labels = termDocsRdd.flatMap(_.labels).distinct()
val labelDict = new Dictionary(labels)
val idfs = (termDocsRdd.flatMap(termDoc =>, _))).distinct().groupBy(_._2) collect {
case (term, docs) if docs.size > 3 =>
term -> (numDocs.toDouble / docs.size.toDouble)
val tfidfs = termDocsRdd flatMap {
termDoc =>
val termPairs: Seq[(Int, Double)] = termDict.tfIdfs(termDoc.terms, idfs) {
label =>
val labelId = labelDict.indexOf(label).toDouble
val vector = Vectors.sparse(termDict.count.toInt, termPairs)
LabeledPoint(labelId, vector)
val model = NaiveBayes.train(tfidfs)
Dictionary class is here:
class Dictionary(dict: RDD[String]) extends Serializable {
//val builder = ImmutableBiMap.builder[String, Long]()
//dict.zipWithIndex.foreach(e => builder.put(e._1, e._2))
//val termToIndex =
val termToIndex = dict.zipWithIndex()
//lazy val indexToTerm = termToIndex.inverse()
lazy val indexToTerm = dict.zipWithIndex().map{
case (k, v) => (v, k)
} //converts from (a, 0),(b, 1),(c, 2) to (0, a),(1, b),(2, c)
val count = termToIndex.count().toInt
def indexOf(term: String): Int = termToIndex.lookup(term).headOption.getOrElse[Long](-1).toInt
def valueOf(index: Int): String = indexToTerm.lookup(index).headOption.getOrElse("")
def tfIdfs (terms: Seq[String], idfs: Map[String, Double]): Seq[(Int, Double)] = {
val filteredTerms = terms.filter(idfs contains)
(filteredTerms.groupBy(identity).map {
case (term, instances) => {
val indexOfTerm: Int = indexOf(term)
if (indexOfTerm < 0) (-1, 0.0) else (indexOf(term), (instances.size.toDouble / filteredTerms.size.toDouble) * idfs(term))
}).filter(p => p._1.toInt >= 0).toSeq.sortBy(_._1)
def vectorize(tfIdfs: Iterable[(Int, Double)]) = {
Vectors.sparse(dict.count().toInt, tfIdfs.toSeq)
Document class looks like this:
case class Document(docId: String, body: String = "", labels: Set[String] = Set.empty)
TermDoc class:
case class TermDoc(doc: String, labels: Set[String], terms: Seq[String])
I'm stuck at this step, I really need to get this job done but I'm having a lot of trouble in finding helpful information about it. Thanks in advance.
P.S: This is based on chimpler's blog:
UPDATE: New code for CSV parser and document builder.
import org.apache.spark.SparkContext
* Created by cespedjo on 14/02/2017.
object TweetParser extends Serializable{
val headerPart = "polarity"
val mentionRegex = """@(.)+?\s""".r
val fullRegex = """(\d+),(.+?),(N|P|NEU|NONE)(,\w+|;\w+)*""".r
def parseAll(csvFiles: Iterable[String], sc: SparkContext) = csvFiles flatMap(csv => parse(csv, sc))
def parse(csvFile: String, sc: SparkContext) = {
val csv = sc.textFile(csvFile)
val docs = scala.collection.mutable.ArrayBuffer.empty[Document]
line => if (!line.contains(headerPart)) docs += buildDocument(line)
def buildDocument(line: String): Document = {
val fullRegex(id, txt, snt, opt) = line
if (id != null && txt != null && snt != null)
new Document(id, mentionRegex.replaceAllIn(txt, ""), Set(snt))
new Document("INVALID")
case class Document(docId: String, body: String = "", labels: Set[String] = Set.empty)
I think the problem is that some documents don't contain any termPairs. You cannot train on empty data points. Try changing your code to:
val tfidfs = termDocsRdd flatMap {
termDoc =>
val termPairs: Seq[(Int, Double)] = termDict.tfIdfs(termDoc.terms, idfs)
if (termPairs.nonEmpty) { {
label =>
val labelId = labelDict.indexOf(label).toDouble
val vector = Vectors.sparse(termDict.count.toInt, termPairs)
LabeledPoint(labelId, vector)
} else {
Upvotes: 1