Raghvendra Tiwari
Raghvendra Tiwari

Reputation: 27

Integrating Spark with Elasticsearch

I am trying to send sparkdataframe to Elasticsearch cluster. I have Spark dataframe(df).

I created index = "spark" but, when I ran this command:

   df.write
     .format("org.elasticsearch.spark.sql")
     .option("es.nodes.wan.only","true")
     .option("es.port","9092")
     .option("es.net.ssl","true")
     .option("es.nodes", "localhost")
     .save("spark/docs")

I came across this error:

py4j.protocol.Py4JJavaError: An error occurred while calling o144.save.
: java.lang.NoClassDefFoundError: scala/Product$class

Spark version: spark-3.0.0-bin-hadoop2.7

Elasticsearch version: elasticsearch-7.7.0

Dependencies added: elasticsearch-hadoop-7.7.0.jar

Upvotes: 2

Views: 3069

Answers (1)

Napoleon Borntoparty
Napoleon Borntoparty

Reputation: 1962

I believe you should to specify es.resource on write, format can be specified as es. The below worked for me on Spark 2.4.5 (running on docker) and ES version 7.5.1. First of all, make sure you're running pyspark with the following package:

PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-hadoop:7.5.1 pyspark-shell

On PySpark side, for example in a notebook:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()
conf.setMaster("local").setAppName("ES Test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "elasticsearch")  # name of my docker container, you might keep localhost
conf.set("es.port", "9200")

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

colnames = [('col_' + str(i+1)) for i in range(11)]
df1 = spark._sc.parallelize([
  [it for it in range(11)], 
  [it for it in range(1,12)]]
).toDF((colnames))

(
  df1
  .write
  .format('es')
  .option(
    'es.resource', '%s/%s' % ('<resource_name>', '<table_name>'))
  .save()
)

Additonal - Verified this was written using elasticsearch Python package:

from elasticsearch import Elasticsearch
esclient = Elasticsearch(['elasticsearch:9200'])


response = esclient.search(
    index='<resource_name>*',
    body={
        "query": {
            "match": {
                "col1": 1
            }
        },
        "aggs": {
            "test_agg": {
                "terms": {
                    "field": "col1",
                    "size": 10
                }
            }
        }
    }
)

Upvotes: 2

Related Questions