matlabit
matlabit

Reputation: 858

Spark Streaming: Write dataframe to ElasticSearch

I using the following code to write a stream to elasticsearch from python (pyspark) application.

#Streaming code
query = df.writeStream \
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "logs/raw") \
.option("es.nodes", "localhost") \
.start()

query.awaitTermination()

If I write the results to the console it works fine, also, if I write to ES - not in streaming mode, it works ok. This is the code I used to write to ES:

#Not streaming
df.write.format("org.elasticsearch.spark.sql") \
.mode('append') \
.option("es.resource", "log/raw") \
.option("es.nodes", "localhost").save("log/raw")

The thing is, I can't debug it, the code is running, but nothing is written to ES (in streaming mode).

Thanks,

Upvotes: 4

Views: 5656

Answers (2)

arctic_Oak
arctic_Oak

Reputation: 1024

Code:

val stream = df
      .writeStream
      .option("checkpointLocation", checkPointDir)
      .format("es")
      .start("realtime/data")

SBT Dependency:

libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-20" % "6.2.4"

Upvotes: 2

matlabit
matlabit

Reputation: 858

Eventually did work out for me, the problem was technical (needed vpn)

query = df.writeStream \
.outputMode("append") \
.queryName("writing_to_es") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "index/type") \
.option("es.nodes", "localhost") \
.start()

query.awaitTermination()

Upvotes: 3

Related Questions