Reputation: 1367
I am trying to save the output of an rdd into elasticsearch. But when i try to send it, i face an error even after including few elasticsearch-spark libraries. I am new to elastic search and any help would be highly appreciated. Thanks.
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
object ElasticSpark {
def main(args: Array[String]) {
val logfile = "/Users/folder/Desktop/logfile.rtf";
val conf = new SparkConf().setMaster("local[1]").setAppName("RddTest"); // set master can be given any cpu cores as local[*], spark clustr, mesos,
conf.set("es.index.auto.create", "true")
val sc = new SparkContext(conf);
val logdata = sc.textFile(logfile); // number of partitions
val NumA = logdata.filter(line=>line.contains("a")).count();
val wordcount = logdata.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a, b)=> a+ b);
println(wordcount.collect()); // doubt
wordcount.saveAsTextFile("/Users/folder/Desktop/sample") // success
wordcount.saveToEs("spark/docs")
}
}
Error:(21, 15) value saveToEs is not a member of org.apache.spark.rdd.RDD[(String, Int)]
wordcount.saveToEs("spark/docs")
^
Error:(6, 12) object elasticsearch is not a member of package org
import org.elasticsearch.spark._
^
Upvotes: 4
Views: 3620
Reputation: 2967
ES support is not a part of Spark distribution, it's a part of elasticsearch-hadoop, so you need to provide this dependency. If you use Maven, add to your pom.xml:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.2.0</version>
</dependency>
For sbt, add to build.sbt:
libraryDependencies += "org.elasticsearch" % "elasticsearch-hadoop" % "2.2.0" % "compile"
resolvers ++= Seq("clojars" at "https://clojars.org/repo",
"conjars" at "http://conjars.org/repo",
"plugins" at "http://repo.spring.io/plugins-release",
"sonatype" at "http://oss.sonatype.org/content/groups/public/")
Upvotes: 2