Reputation: 11
I am a beginner trying to get tweets using spark streaming using Scala with some filter keywords. Is there a possibility to filter only the tweets which don't have geolocation as Null after streaming ? I am trying to save the tweets in ElasticSearch. So before saving the tweet map to ElasticSearch, can I filter the ones with geolocation information and then save them? I am creating JSON using json4s.JSONDSL with fields from the tweet. This is the sample code
val stream = TwitterUtils.createStream(ssc, None, filters) val tweetMap = stream.map(status => { val tweetMap =
("location" -> Option(status.getGeoLocation).map(geo => { s"${geo.getLatitude},${geo.getLongitude}" })) ~
("UserLang" -> status.getUser.getLang) ~
("UserLocation" -> Option(status.getUser.getLocation)) ~
("UserName" -> status.getUser.getName) ~
("Text" -> status.getText) ~
("TextLength" -> status.getText.length) ~
//Tokenized the tweet message and then filtered only words starting with #
("HashTags" -> status.getText.split(" ").filter(_.startsWith("#")).mkString(" ")) ~
("PlaceCountry" -> Option(status.getPlace).map (pl => {s"${pl.getCountry}"}))
tweetMap.map(s => List("Tweet Extracted")).print
// Each batch is saved to Elasticsearch
tweetMap.foreachRDD { tweets => EsSpark.saveToEs(tweets, "sparksender/tweets")) }
//before this step is there a way to filter out tweets which have "location" as null?
I referred the code from github: https://github.com/luvgupta008/ScreamingTwitter/blob/master/src/main/scala/com/spark/streaming/TwitterTransmitter.scala
Upvotes: 0
Views: 465
Reputation: 1373
Check out the filter
method on the RDD. Takes a predicate function (a: A) => Boolean
. If the return value is true, the element is added to the list. If it's false, the element won't be added to the list.
tweetMap.filter(
status => Option(status.getGeoLocation) match {
case Some(_) => true
case None => false
})
Upvotes: 0