Reputation: 102
I'm trying to use Elasticsearch (ES) 6.1.1 Hadoop with installed x-pack to write data using Spark Structured Streaming 2.2.1. This is my code (the index already exists in elastic):
val exceptions = spark
.readStream
.text(path)
val advancedQuery = exceptions
.writeStream
.format("org.elasticsearch.spark.sql")
.trigger(Trigger.ProcessingTime(10.seconds))
.option("checkpointLocation", "/checkpoint")
val runningQuery = advancedQuery.start("spark/exc")
runningQuery.awaitTermination
But I do get the following exception:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:327)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:575)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: missing authentication token for REST request [/]
null**
How can I set the required authentication data?
Upvotes: 0
Views: 730
Reputation: 102
Figured it out: One needs to add two additional options "es.net.http.auth.user" and "es.net.http.auth.pass" like here:
val advancedQuery = exceptions
...
.option("es.net.http.auth.user", "*your elastic user goes here*")
.option("es.net.http.auth.pass", "*your elastic password goes here*")
...
Upvotes: 1