Reputation: 457
Using the elasticsearch-hadoop library I would like to read data from ElasticSearch straight into a Spark Dataset. However the API for that returns RDD[(String, Map[String, Any])] where the first element of the tuple is the document name and the second (the map) is the data itself.
I would like to turn this into a Dataset[T], where T is some case class, to make the returned data easier to work with. I would consider either using some other library (I couldn't find any) or a neat code solution.
Upvotes: 0
Views: 1534
Reputation: 457
A colleague found a better solution in the docs:
import spark.implicits._
spark.sqlContext.load("spark/index", "org.elasticsearch.spark.sql").as[MyCaseClass]
Docs section here - https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources
Upvotes: 0
Reputation: 457
I wrote a function stringMapRddToDataset
to do this. I feel like there should be a nicer way to do this overall though... Also concerned with efficiency of this solution, but I've not tested that on large amounts of data yet.
private def mapToSparkRow(map: collection.Map[String, Any], orderedFields: List[StructField]): Row = {
val orderedValues = orderedFields.map { field =>
val columnValue = map.getOrElse(field.name, null)
field.dataType match {
case nestedField: StructType =>
mapToSparkRow(columnValue.asInstanceOf[Map[String, Any]], nestedField.toList)
case notNested => columnValue
}
}
Row(orderedValues: _*)
}
def stringMapRddToDataset[T: Encoder](rdd: RDD[collection.Map[String, Any]])(
implicit spark: SparkSession): Dataset[T] = {
val encoder = implicitly[Encoder[T]]
val rddOfRows: RDD[Row] = rdd.map(mapToSparkRow(_, encoder.schema.toList))
val df = spark.createDataFrame(rddOfRows, encoder.schema)
df.as[T]
}
Upvotes: 0