Naresh
Naresh

Reputation: 5397

Mapping field names of the output from Spark-Streaming to Elastic Search

I am using the following code to store the output of Spark-Streaming to ElasticSearch. I want to map the output of spark-streaming to the proper name i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count). But as you can see currently it is mapped in ES like _1 or _2 etc. Moreover, I want to put some filter i.e (if PlatFormName = "ubuntu" then index the data) before indexing the data in ES. So, how do i do that?

 val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)

            val pageCounts = realTimeAgg.map    
            pageCounts.foreachRDD{ x => 
                    if (x.toLocalIterator.nonEmpty) {       
                        EsSpark.saveToEs(x, "spark/ElasticSearch")
                    }
                }   

            ssc.start()
            ssc.awaitTermination()

Output in ElasticSearch :

{
            "_index": "spark",
            "_type": "ElasticSearch",
            "_id": "AVTH0JPgzgtrAOUg77qq",
            "_score": 1,
            "_source": {
               "_1": {
                  "_3": "Amiga",
                  "_2": "AmigaOS 1.3",
                  "_6": "SeaMonkey",
                  "_1": "Usedcar",
                  "_4": 0,
                  "_5": 0
               },
               "_2": 1013
            }
         }

Upvotes: 1

Views: 674

Answers (1)

Pranav Shukla
Pranav Shukla

Reputation: 2226

The keys of elastic search document are _1, _2, etc. because you are storing a PairRDD with (Tuple6, Long) data types.

To retain the keys, you should use case class as the key.

val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_)

I am assuming that the class of object x is a case class and you want to use all fields of that class for doing the reduction (i.e. for checking the equality of 2 case class instances). If all fields of that class are NOT natural key of the class to be used for equality, then you have two options -

  1. Override equals and hashCode for your case class
  2. Create another case class that only has the key fields (the fields you have used in your tuple - (x.key, x.os, x.platform, x.mobile, x.browser)) and map to that case class instead of a Tuple in the first line lines.map { x => ...}.

You can add filter that you want before writing to ElasticSearch.

pageCounts.foreachRDD { x => 
                        if (x.toLocalIterator.nonEmpty) {
                            val y = x.filter(z => z._1.platform == "ubuntu")       
                            EsSpark.saveToEs(y, "spark/ElasticSearch")
                    }
                }  

PS: If you are testing the pair RDD with (case class, Long) case class as a key like I have suggested lines.map(x => (x, 1)).reduceByKey(_ + _). There is a bug specifically related to Spark Shell that case classes don't work as key classes correctly for reduce operations - jira issue

Upvotes: 1

Related Questions