Reputation: 858
I using the following code to write a stream to elasticsearch from python (pyspark) application.
#Streaming code
query = df.writeStream \
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "logs/raw") \
.option("es.nodes", "localhost") \
.start()
query.awaitTermination()
If I write the results to the console it works fine, also, if I write to ES - not in streaming mode, it works ok. This is the code I used to write to ES:
#Not streaming
df.write.format("org.elasticsearch.spark.sql") \
.mode('append') \
.option("es.resource", "log/raw") \
.option("es.nodes", "localhost").save("log/raw")
The thing is, I can't debug it, the code is running, but nothing is written to ES (in streaming mode).
Thanks,
Upvotes: 4
Views: 5656
Reputation: 1024
Code:
val stream = df
.writeStream
.option("checkpointLocation", checkPointDir)
.format("es")
.start("realtime/data")
SBT Dependency:
libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-20" % "6.2.4"
Upvotes: 2
Reputation: 858
Eventually did work out for me, the problem was technical (needed vpn)
query = df.writeStream \
.outputMode("append") \
.queryName("writing_to_es") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "index/type") \
.option("es.nodes", "localhost") \
.start()
query.awaitTermination()
Upvotes: 3