Opal
Opal

Reputation: 84884

Getting values for keys in spark SQL query

I've the following DF schema:

scala> hotelsDF.printSchema()
root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- changeset: long (nullable = true)
 |-- uid: integer (nullable = true)
 |-- user_sid: binary (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: binary (nullable = true)
 |    |    |-- value: binary (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

I need to filter records which have key equal to tourism and value equal to hotel. I do it with the following SQL query:

sqlContext.sql("select * from nodes where array_contains(tags.key, binary('tourism')) and array_contains(tags.value, binary('hotel'))").show()

So far, so good.

Now, my question is how can I select the value for a given tag key? Pseudoquery will be something like:

sqlContext.sql("select tags.tourism from nodes where array_contains(tags.key, binary('tourism')) and array_contains(tags.value, binary('hotel'))").show()

and return hotel for all entries.

Upvotes: 0

Views: 656

Answers (2)

Opal
Opal

Reputation: 84884

I've solved it with different approach. I've added the following case classes:

case class Entry(
                  id: Long,
                  version: Int,
                  timestamp: Long,
                  changeset: Long,
                  uid: Int,
                  user_sid: Array[Byte],
                  tags: Array[Tag],
                  latitude: Double,
                  longitude: Double
                )

case class Tag(key: Array[Byte], value: Array[Byte])

case class Hotel(
                  id: Long,
                  stars: Option[String],
                  latiutde: Double,
                  longitude: Double,
                  name: String,
                  rooms: Option[String]
                )

What's interesting (and caused some problems to me) the equivalent of spark's binary is just Array[Byte].

and processed the DF in the following way:

def process(country: String) = {
    val dir = "/whatever/dir"
    val df = spark.read.parquet(s"$dir/$country/latest.node.parquet")

    df
      .as[Entry]
      .filter(e => e.tags != null && e.tags.nonEmpty)
      .filter(e =>
        e.tags.exists(t => new String(t.key).equalsIgnoreCase("tourism") && new String(t.value).equalsIgnoreCase("hotel"))
      )
      .map(e => Hotel(
        e.id,
        e.tags.find(findTag("stars")).map(t => new String(t.value)),
        e.latitude,
        e.longitude,
        e.tags.find(findTag("name")).map(t => new String(t.value)).orNull,
        e.tags.find(findTag("rooms")).map(t => new String(t.value))
      ))
      .repartition(1)
      .write
      .format("csv")
      .option("nullValue", null)
      .option("header", value = true)
      .option("delimiter", ",")
      .save(s"$dir/$country/csv")
  }

Upvotes: 0

jayrythium
jayrythium

Reputation: 767

you could explode the array and then filter:

hotelsDF.withColumn(
    "tags1", 
    explode(col("tags"))
).drop(
    "tags"
).filter(
    (col("tags1.key") == "tourism") & (col("tags1.value") == "hotel")
).show()

Upvotes: 1

Related Questions