Chris Bedford
Chris Bedford

Reputation: 2692

elasticsearch-hadoop spark connector unable to connect/write using out-of-box ES server setup, & default library settings

I had some problems using the Elasticsearch connector for Spark described here: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html. I could not even get the examples on their page working with a plain vanilla instance of Elasticsearch 7.4.0 that I downloaded and started via

<downloadDir>/bin/elasticsearch 

Here is what I did to run. I started Spark via the command:

spark-shell --packages "org.elasticsearch:elasticsearch-hadoop:7.4.0"

Then I typed in the lines from the code given on the documentation page referenced above:

import org.apache.spark.SparkContext        // attempt 1
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

spark.sparkContext.makeRDD( Seq(numbers, airports)).saveToEs("spark/docs")

I got some strange errors indicating ES was connecting to something other than the default master node [127.0.0.1:9200], and then failing even with that node:

[Stage 0:>                                                        (0 + 12) / 12]20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]
20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]

Note that if I type http://127.0.0.1:9200/ in my browser URL bar I get back a JSON doc indicating the cluster is up on localhost:9200. So, I'm stumped! Any guidance much appreciated.

** UPDATE **

I tried the answer suggested by Mikalai (had to call saveToES via RDD, not Dataframe as that didn't compile for some reason). Unfortunately, got same result.

import org.apache.spark.rdd.RDD   // attempt 2
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._

object classes {
  case class AlbumIndex(group: String, year: Int,  title: String)

}
object App extends App {
  import classes._
  val spark = SparkSession .builder() .appName("writetoes") .master("local[*]") .config("spark.es.nodes","localhost").config("spark.es.port","9200").getOrCreate()
  val indexDocuments: Seq[AlbumIndex] = Seq(
      AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
      AlbumIndex("Boston",1976,"Boston"),
      AlbumIndex("Fleetwood Mac", 1979,"Tusk")
  )
  val rdd: RDD[AlbumIndex] = spark.sparkContext.makeRDD( indexDocuments)
  rdd.saveToEs("demoindex/albumindex")
}

Upvotes: 2

Views: 2069

Answers (3)

Chris Bedford
Chris Bedford

Reputation: 2692

So, the problem was I had another instance of elasticsearch in another window listening on the same port. That always hoses things in strange ways. So.. this adapter has no problem at all. Problem was me.

Upvotes: 1

C0M37
C0M37

Reputation: 574

Note that the 172.0.0.0 network space is a private network range of IP's. Most likely your Elasticsearch node is picking up one of those addresses as a bind address instead of 127.0.0.1. ES-Hadoop/Spark attempts to "discover" your cluster before doing any writing to it. Part of that discovery process involves contacting a node at random from the list of nodes given and asking it for the IP addresses of all nodes in the cluster. It's likely that your Elasticsearch node thinks it should be reachable on 172.x.x.x and the connector is picking that address up as part of the discovery process and trying to use it for all communication going forward, even though an IP connection cannot be established to that address (for any number of reasons).

You should be able to disable node discovery for these sorts of local runs. This will switch the ES-Hadoop/Spark connector to not try and find any nodes on the cluster that aren't already specified in the es.nodes setting. You can do this by setting the es.nodes.discovery property to false. In Spark, you would need to prefix it with spark. or else Spark will throw the property out.

SparkSession.builder()
      .appName("my-app")
      .config("spark.es.nodes", "localhost")
      .config("spark.es.port", "9200")
      .config("spark.es.nodes.discovery", false)
      .getOrCreate()

Upvotes: 2

You need to configure elasticsearch port and ip where its running please find the below i think this will help you.

val spark = SparkSession
    .builder()
    .appName("writetoes")
    .master("local[*]")
    .config("spark.es.nodes","localhost")//give your elastic node ip
    .config("spark.es.port","9200")//port where its running
    .getOrCreate()

import spark.implicits._

val indexDocuments = Seq(
    AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
    AlbumIndex("Boston",1976,"Boston"),
    AlbumIndex("Fleetwood Mac", 1979,"Tusk")
).toDF

indexDocuments.saveToEs("demoindex/albumindex")

Upvotes: 0

Related Questions