Reputation: 137
I'am trying to run a simple code of Spark that copies the content of an RDD into an elastic search document. Both spark and elastic search are installed on my local machine.
import org.elasticsearch.spark.sql._
import org.apache.spark.sql.SparkSession
object ES {
case class Person(ID: Int, name: String, age: Int, numFriends:
Int);
def mapper(line: String): Person = {
val fields = line.split(',')
val person: Person = Person(fields(0).toInt, fields(1),
fields(2).toInt, fields(3).toInt)
return person}
def main(args: Array[String]): Unit = {
val spark: SparkSession =
SparkSession
.builder().master("local[*]")
.appName("SparkEs")
.config("es.index.auto.create", "true")
.config("es.nodes","localhost:9200")
.getOrCreate()
import spark.implicits._
val lines = spark.sparkContext.textFile("/home/herch/fakefriends.csv")
val people = lines.map(mapper).toDF()
people.saveToEs("spark/people")
}
}
I'am Getting this error. After multiples retries
INFO HttpMethodDirector: I/O exception (java.net.ConnectException)
caught when processing request:Connection timed out (Connection timed
out)
INFO HttpMethodDirector: Retrying request
INFO DAGScheduler: ResultStage 0 (runJob at EsSparkSQL.scala:97)
failed in 525.902 s due to Job aborted due to stage failure: Task 1
in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in
stage 0.0 (TID 1, localhost, executor driver):
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException:
Connection error (check network and/or proxy settings)- all nodes
failed; tried [[192.168.0.22:9200]]
It seems to be a connection problem but i cannot identify its cause. Elastic search is running on my local machine on localhost:9200 and i'am able to query it via the terminal.
Upvotes: 1
Views: 4205
Reputation: 111
Since you are running both locally, you need to set es.nodes.wan.only
to true
(default false
) in your SparkConf. I ran into the same exact problem and that fixed it.
See: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
Upvotes: 1
Reputation: 9308
As seen on the elasticsearch / spark connector documentation page, you need to separate the host and port arguments inside the configuration :
val options13 = Map("path" -> "spark/index",
"pushdown" -> "true",
"es.nodes" -> "someNode", "es.port" -> "9200")
See how es.nodes
only contains the host name, and es.port
contains the HTTP port.
Upvotes: 0