Harika Punyamurthula
Harika Punyamurthula

Reputation: 11

Spark streaming - filter tweets after streaming with geolocation

I am a beginner trying to get tweets using spark streaming using Scala with some filter keywords. Is there a possibility to filter only the tweets which don't have geolocation as Null after streaming ? I am trying to save the tweets in ElasticSearch. So before saving the tweet map to ElasticSearch, can I filter the ones with geolocation information and then save them? I am creating JSON using json4s.JSONDSL with fields from the tweet. This is the sample code

val stream = TwitterUtils.createStream(ssc, None, filters) val tweetMap = stream.map(status => { val tweetMap =

      ("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~
      ("UserLang" -> status.getUser.getLang) ~
      ("UserLocation" -> Option(status.getUser.getLocation)) ~
      ("UserName" -> status.getUser.getName) ~
      ("Text" -> status.getText) ~
      ("TextLength" -> status.getText.length) ~
      //Tokenized the tweet message and then filtered only words starting with #
      ("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~
      ("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"}))

tweetMap.map(s => List("Tweet Extracted")).print

// Each batch is saved to Elasticsearch 
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) }

//before this step is there a way to filter out tweets which have "location" as null?

I referred the code from github: https://github.com/luvgupta008/ScreamingTwitter/blob/master/src/main/scala/com/spark/streaming/TwitterTransmitter.scala

Upvotes: 0

Views: 465

Answers (1)

Quy
Quy

Reputation: 1373

Check out the filter method on the RDD. Takes a predicate function (a: A) => Boolean. If the return value is true, the element is added to the list. If it's false, the element won't be added to the list.

tweetMap.filter(
  status => Option(status.getGeoLocation) match {
    case Some(_) => true
    case None => false
  })

Upvotes: 0

Related Questions