amprie286
amprie286

Reputation: 107

Looping through Map Spark Scala

Within this code we have two files: athletes.csv that contains names, and twitter.test that contains the tweet message. We want to find name for every single line in the twitter.test that match the name in athletes.csv We applied map function to store the name from athletes.csv and want to iterate all of the name to all of the line in the test file.

object twitterAthlete {

  def loadAthleteNames() : Map[String, String] = {

    // Handle character encoding issues:
    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

    // Create a Map of Ints to Strings, and populate it from u.item.
    var athleteInfo:Map[String, String] = Map()
    //var movieNames:Map[Int, String] = Map() 
     val lines = Source.fromFile("../athletes.csv").getLines()
     for (line <- lines) {
       var fields = line.split(',')
       if (fields.length > 1) {
        athleteInfo += (fields(1) -> fields(7))
       }
     }

     return athleteInfo
  }

  def parseLine(line:String): (String)= {
    var athleteInfo = loadAthleteNames()
    var hello = new String
    for((k,v) <- athleteInfo){
      if(line.toString().contains(k)){
        hello = k
      }
    }
    return (hello)
  }


  def main(args: Array[String]){
    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "twitterAthlete")

    val lines = sc.textFile("../twitter.test")
    var athleteInfo = loadAthleteNames()

    val splitting = lines.map(x => x.split(";")).map(x => if(x.length == 4 && x(2).length <= 140)x(2)) 

    var hello = new String()
    val container = splitting.map(x => for((key,value) <- athleteInfo)if(x.toString().contains(key)){key}).cache


    container.collect().foreach(println)  

   // val mapping = container.map(x => (x,1)).reduceByKey(_+_)
    //mapping.collect().foreach(println)
  }
}

the first file look like:

id,name,nationality,sex,height........  
001,Michael,USA,male,1.96 ...
002,Json,GBR,male,1.76 ....
003,Martin,female,1.73 . ...

the second file look likes:

time, id , tweet .....
12:00, 03043, some message that contain some athletes names  , .....
02:00, 03023, some message that contain some athletes names , .....

some thinks like this ...

but i got empty result after running this code, any suggestions is much appreciated

result i got is empty :

()....
()...
()...

but the result that i expected something like:

(name,1)
(other name,1)

Upvotes: 2

Views: 1760

Answers (2)

edkeveked
edkeveked

Reputation: 18381

You need to use yield to return value to your map

 val container = splitting.map(x => for((key,value) <- athleteInfo ; if(x.toString().contains(key)) ) yield (key, 1)).cache

Upvotes: 1

Silvio
Silvio

Reputation: 4207

I think you should just start with the simplest option first...

I would use DataFrames so you can use the built-in CSV parsing and leverage Catalyst, Tungsten, etc.

Then you can use the built-in Tokenizer to split the tweets into words, explode, and do a simple join. Depending how big/small the data with athlete names is you'll end up with a more optimized broadcast join and avoid a shuffle.

import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.Tokenizer

val tweets = spark.read.format("csv").load(...)
val athletes = spark.read.format("csv").load(...)

val tokenizer = new Tokenizer()
tokenizer.setInputCol("tweet")
tokenizer.setOutputCol("words")

val tokenized = tokenizer.transform(tweets)

val exploded = tokenized.withColumn("word", explode('words))

val withAthlete = exploded.join(athletes, 'word === 'name)

withAthlete.select(exploded("id"), 'name).show()

Upvotes: 1

Related Questions