Reputation: 5397
I am using the following code to store the output of Spark-Streaming
to ElasticSearch
. I want to map the output of spark-streaming to the proper name i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
. But as you can see currently it is mapped in ES like _1 or _2 etc.
Moreover, I want to put some filter i.e (if PlatFormName = "ubuntu" then index the data)
before indexing the data in ES. So, how do i do that?
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
Output in ElasticSearch :
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}
Upvotes: 1
Views: 674
Reputation: 2226
The keys of elastic search document are _1, _2, etc. because you are storing a PairRDD with (Tuple6, Long) data types.
To retain the keys, you should use case class as the key.
val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_)
I am assuming that the class of object x is a case class and you want to use all fields of that class for doing the reduction (i.e. for checking the equality of 2 case class instances). If all fields of that class are NOT natural key of the class to be used for equality, then you have two options -
You can add filter that you want before writing to ElasticSearch.
pageCounts.foreachRDD { x =>
if (x.toLocalIterator.nonEmpty) {
val y = x.filter(z => z._1.platform == "ubuntu")
EsSpark.saveToEs(y, "spark/ElasticSearch")
}
}
PS: If you are testing the pair RDD with (case class, Long) case class as a key like I have suggested lines.map(x => (x, 1)).reduceByKey(_ + _). There is a bug specifically related to Spark Shell that case classes don't work as key classes correctly for reduce operations - jira issue
Upvotes: 1