Matthias Mueller
Matthias Mueller

Reputation: 102

How to setup Elasticsearch Structured Streaming with X-Pack enabled?

I'm trying to use Elasticsearch (ES) 6.1.1 Hadoop with installed x-pack to write data using Spark Structured Streaming 2.2.1. This is my code (the index already exists in elastic):

val exceptions = spark
  .readStream
  .text(path)
val advancedQuery = exceptions
  .writeStream
  .format("org.elasticsearch.spark.sql")
  .trigger(Trigger.ProcessingTime(10.seconds))
  .option("checkpointLocation", "/checkpoint")
val runningQuery = advancedQuery.start("spark/exc")
runningQuery.awaitTermination

But I do get the following exception:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:327)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:575)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:58)
    at org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run(EsStreamQueryWriter.scala:41)
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:52)
    at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink$$anonfun$addBatch$2$$anonfun$2.apply(EsSparkSqlStreamingSink.scala:51)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
**Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: missing authentication token for REST request [/]
null**

How can I set the required authentication data?

Upvotes: 0

Views: 730

Answers (1)

Matthias Mueller
Matthias Mueller

Reputation: 102

Figured it out: One needs to add two additional options "es.net.http.auth.user" and "es.net.http.auth.pass" like here:

val advancedQuery = exceptions
  ...
  .option("es.net.http.auth.user", "*your elastic user goes here*")
  .option("es.net.http.auth.pass", "*your elastic password goes here*")
  ...

Upvotes: 1

Related Questions