Timmeh
Timmeh

Reputation: 457

Reading data from ElasticSearch into a Spark Dataset

Using the elasticsearch-hadoop library I would like to read data from ElasticSearch straight into a Spark Dataset. However the API for that returns RDD[(String, Map[String, Any])] where the first element of the tuple is the document name and the second (the map) is the data itself.

I would like to turn this into a Dataset[T], where T is some case class, to make the returned data easier to work with. I would consider either using some other library (I couldn't find any) or a neat code solution.

Upvotes: 0

Views: 1534

Answers (2)

Timmeh
Timmeh

Reputation: 457

A colleague found a better solution in the docs:

import spark.implicits._
spark.sqlContext.load("spark/index", "org.elasticsearch.spark.sql").as[MyCaseClass]

Docs section here - https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources

Upvotes: 0

Timmeh
Timmeh

Reputation: 457

I wrote a function stringMapRddToDataset to do this. I feel like there should be a nicer way to do this overall though... Also concerned with efficiency of this solution, but I've not tested that on large amounts of data yet.

  private def mapToSparkRow(map: collection.Map[String, Any], orderedFields: List[StructField]): Row = {
    val orderedValues = orderedFields.map { field =>
      val columnValue = map.getOrElse(field.name, null)
      field.dataType match {
        case nestedField: StructType =>
          mapToSparkRow(columnValue.asInstanceOf[Map[String, Any]], nestedField.toList)
        case notNested => columnValue
      }
    }
    Row(orderedValues: _*)
  }

  def stringMapRddToDataset[T: Encoder](rdd: RDD[collection.Map[String, Any]])(
      implicit spark: SparkSession): Dataset[T] = {
    val encoder             = implicitly[Encoder[T]]
    val rddOfRows: RDD[Row] = rdd.map(mapToSparkRow(_, encoder.schema.toList))
    val df                  = spark.createDataFrame(rddOfRows, encoder.schema)
    df.as[T]
  }

Upvotes: 0

Related Questions