Reputation: 115
I'm having problems connecting to an ElasticSearch node running locally from my Java code which is running as a job submitted to Spark (running locally). However, when I don't use Spark the connection is no problem. Also running a Python job and submitting this to spark works fine.
I know for Java I need to connect through port 9300 instead of 9200 (HTTP port). Nevertheless I always get the same exception, no difference in reading or writing:
16/08/04 16:51:55 ERROR NetworkClient: Node [The server localhost failed to respond with a valid HTTP response] failed (localhost:9300); no other nodes left - aborting... Exception in thread "main" org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9300]] at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:102) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:282) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:266) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:270) at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:108) at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:90) at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61) at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434) at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:415) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.take(RDD.scala:1302) at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.first(RDD.scala:1341) at org.apache.spark.api.java.JavaPairRDD.first(JavaPairRDD.scala:211) at com.dd.mediaforce.spark.most_popular.ExecutorMostPopular.main(ExecutorMostPopular.java:564) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
We are running Spark and ElasticSearch on a number of nodes. The Python code is running fine here, but trying the Java code with this setup of ES didn't help solving the problem either.
The code I'm using connect from Java:
SparkConf _sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test");
JavaSparkContext jsc = new JavaSparkContext(_sparkConf);
Configuration conf = new Configuration();
conf.set("cluster.name", "our_clustername");
conf.set("es.nodes", "localhost");
conf.setInt("es.port", 9300);
conf.set("es.resource", index_and_type);
JavaPairRDD readRdd = jsc.newAPIHadoopRDD(conf, org.elasticsearch.hadoop.mr.EsInputFormat.class, org.apache.hadoop.io.NullWritable.class, org.elasticsearch.hadoop.mr.LinkedMapWritable.class);
System.out.println(readRdd.first());
jsc.stop();
The following Java code using the TransportClient (and no Spark) has as mentioned NO problem connecting to ES, both writing and reading working fine:
Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
for (ObjectCursor<IndexMetaData> value : indices.values()) {
log.info("Index: " + value.index + " : " + value.toString());
}
GetResponse response = client.prepareGet("index_name", "type_name", "1").get();
log.info(response.getIndex() + " : " + response.getId() + " : " + response.isExists());
String field_id = "6";
IndexRequest indexRequest = new IndexRequest("index_name", "type", "2")
.source(jsonBuilder()
.startObject()
.prettyPrint()
.field("field_id", field_id)
.field("another_field", "value")
.field("integer_field", 100)
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index_name", "type_name", article_id)
.doc(jsonBuilder()
.startObject()
.prettyPrint()
.field("field_id", field_id)
.field("another_field", "value")
.field("integer_field", 100)
.endObject())
.upsert(indexRequest);
UpdateResponse responseUpdate = client.update(updateRequest).get();
log.info(responseUpdate.getIndex() + " : " + responseUpdate.getGetResult() + " : " + responseUpdate.getType());
client.close();
Any suggestion is welcome, since I've been stuck here for days now and not having the impression of getting any further. I obviously Googled the problem and searched on StackOverflow but so far I haven't found the answer to my problem yet.
For completeness some of the Python code which also works fine reading and writing to ES using Spark.
conf = SparkConf()
conf = conf.setAppName('Test')
sc = SparkContext(conf=conf)
#Omitting some of the code in creating some_rdd on Spark:
index_and_type = index_name + '/type_name'
groovy_script = "if (ctx._source.%s) { ctx._source.%s+=value } else { ctx._source.%s=value }" % (field, field, field)
es_db_connection_dictionary = {
"es.nodes": db_hosts,
"es.port": db_port,
"es.resource": index_and_type,
"es.write.operation": "upsert",
"es.mapping.id": "field_id",
"es.update.script": groovy_script,
"es.update.script.params": "value:%s" % integer_field,
"es.http.timeout": "10s"
}
es_input = views_tuple_rdd.map(lambda item: (item[0],
{
'field_id': item[0],
"integer_field": item[1],
"another_field": client_name,
}))
es_input.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_db_connection_dictionary)
Upvotes: 3
Views: 7029
Reputation: 40360
Normally if you are using the elasticsearch-spark connector, you don't need to use the port 9300 if the default one is 9200. it has a different behavior than the regular elasticsearch API.
And it also seemed like you were using an incompatible version of the connector with elasticsearch. It's a common mistake since they are mainly in 2.x for most.
I believe that it wouldn't be the case for elasticsearch 5.x on which they've aligned all of the other elastic product version to.
Upvotes: 2