Reputation: 27
I am trying to send sparkdataframe to Elasticsearch cluster. I have Spark dataframe(df).
I created index = "spark" but, when I ran this command:
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9092")
.option("es.net.ssl","true")
.option("es.nodes", "localhost")
.save("spark/docs")
I came across this error:
py4j.protocol.Py4JJavaError: An error occurred while calling o144.save.
: java.lang.NoClassDefFoundError: scala/Product$class
Spark version: spark-3.0.0-bin-hadoop2.7
Elasticsearch version: elasticsearch-7.7.0
Dependencies added: elasticsearch-hadoop-7.7.0.jar
Upvotes: 2
Views: 3069
Reputation: 1962
I believe you should to specify es.resource
on write, format can be specified as es
. The below worked for me on Spark 2.4.5
(running on docker
) and ES version 7.5.1. First of all, make sure you're running pyspark
with the following package:
PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-hadoop:7.5.1 pyspark-shell
On PySpark side, for example in a notebook:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setMaster("local").setAppName("ES Test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "elasticsearch") # name of my docker container, you might keep localhost
conf.set("es.port", "9200")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
colnames = [('col_' + str(i+1)) for i in range(11)]
df1 = spark._sc.parallelize([
[it for it in range(11)],
[it for it in range(1,12)]]
).toDF((colnames))
(
df1
.write
.format('es')
.option(
'es.resource', '%s/%s' % ('<resource_name>', '<table_name>'))
.save()
)
Additonal - Verified this was written using elasticsearch
Python package:
from elasticsearch import Elasticsearch
esclient = Elasticsearch(['elasticsearch:9200'])
response = esclient.search(
index='<resource_name>*',
body={
"query": {
"match": {
"col1": 1
}
},
"aggs": {
"test_agg": {
"terms": {
"field": "col1",
"size": 10
}
}
}
}
)
Upvotes: 2