Sai
Sai

Reputation: 1117

How to implement index update functionality in elasticsearch using spark?

I am new to ElasticSearch. I have a huge data to index using Elasticsearch.

I am use Apache Spark to index the data in hive table using Elasticsearch.

as part of this functionality, i wrote simple Spark Script.

object PushToES {
  def main(args: Array[String]) {
       val Array(inputQuery, index, host) = args
     val sparkConf = new SparkConf().setMaster("local[1]").setAppName("PushToES")
    sparkConf.set("....",Host)
    sparkConf.set("....","9200")
    val sc = new SparkContext(sparkConf)
    val ht = new org.apache.spark.sql.hive.HiveContext(sc)

    val ps = hhiveSqlContext.sql(inputQuery)
     ps.toJSON.saveJsonToEs(index)

  }
}

After that I am generating jar and submitting the job by using spark-submit

spark-submit --jars ~/*.jar --master local[*] --class com.PushToES *.jar "select * from gtest where day=20170711" gest3 localhost

then I am executing the below command for

curl -XGET 'localhost:9200/test/test_test/_count?pretty'

first time it is showing properly

{
  "count" : 10,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  }
}

if i execute second time same curl command it is giving result like bleow

{
  "count" : 20,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  }
}

if i execute 3rd time same command i am getting

{
  "count" : 30,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  }
}

But I am not understanding every time why it is adding count value to existing index value(i.e. Count)

Please let me know how can I resolve this issue i.e . if I am execute any number of time also I have to get same value (correct count value i.e 10)

I am expecting below result for this case because correct count value is 10.(I executed count query on hive table for getting every time count(*) as 10)

{
  "count" : 10,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  }
}

Thanks in advance .

Upvotes: 0

Views: 1004

Answers (1)

GPI
GPI

Reputation: 9348

If you want to "replace" the data each time you run, and not to "append" it, then you have to configure for such a scenario in your Spark Elasticsearch properties.

First thing you need to do is to have an ID in your document, and tell elastisearch what is your id "column" (if you come from a dataframe) or key (in json terms).

This is documented here : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

For cases where the id (or other metadata fields like ttl or timestamp) of the document needs to be specified, one can do so by setting the appropriate mapping namely es.mapping.id. Following the previous example, to indicate to Elasticsearch to use the field id as the document id, update the RDD configuration (it is also possible to set the property on the SparkConf though due to its global effect it is discouraged):

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))

A second configuration key is available to control what kind of job elasticsearch tries to do upon writing data, but the default is correct for your user case :

es.write.operation (default index)

The write operation elasticsearch-hadoop should peform - can be any of:

index (default) new data is added while existing data (based on its id) is replaced (reindexed).

create adds new data - if the data already exists (based on its id), an exception is thrown.

update updates existing data (based on its id). If no data is found, an exception is thrown.

upsert known as merge or insert if the data does not exist, updates if the data exists (based on its id).

Upvotes: 1

Related Questions