Reputation: 84884
I've the following DF schema:
scala> hotelsDF.printSchema()
root
|-- id: long (nullable = true)
|-- version: integer (nullable = true)
|-- timestamp: long (nullable = true)
|-- changeset: long (nullable = true)
|-- uid: integer (nullable = true)
|-- user_sid: binary (nullable = true)
|-- tags: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: binary (nullable = true)
| | |-- value: binary (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
I need to filter records which have key
equal to tourism
and value
equal to hotel
. I do it with the following SQL query:
sqlContext.sql("select * from nodes where array_contains(tags.key, binary('tourism')) and array_contains(tags.value, binary('hotel'))").show()
So far, so good.
Now, my question is how can I select the value for a given tag key? Pseudoquery will be something like:
sqlContext.sql("select tags.tourism from nodes where array_contains(tags.key, binary('tourism')) and array_contains(tags.value, binary('hotel'))").show()
and return hotel
for all entries.
Upvotes: 0
Views: 656
Reputation: 84884
I've solved it with different approach. I've added the following case classes:
case class Entry(
id: Long,
version: Int,
timestamp: Long,
changeset: Long,
uid: Int,
user_sid: Array[Byte],
tags: Array[Tag],
latitude: Double,
longitude: Double
)
case class Tag(key: Array[Byte], value: Array[Byte])
case class Hotel(
id: Long,
stars: Option[String],
latiutde: Double,
longitude: Double,
name: String,
rooms: Option[String]
)
What's interesting (and caused some problems to me) the equivalent of spark's binary is just Array[Byte]
.
and processed the DF in the following way:
def process(country: String) = {
val dir = "/whatever/dir"
val df = spark.read.parquet(s"$dir/$country/latest.node.parquet")
df
.as[Entry]
.filter(e => e.tags != null && e.tags.nonEmpty)
.filter(e =>
e.tags.exists(t => new String(t.key).equalsIgnoreCase("tourism") && new String(t.value).equalsIgnoreCase("hotel"))
)
.map(e => Hotel(
e.id,
e.tags.find(findTag("stars")).map(t => new String(t.value)),
e.latitude,
e.longitude,
e.tags.find(findTag("name")).map(t => new String(t.value)).orNull,
e.tags.find(findTag("rooms")).map(t => new String(t.value))
))
.repartition(1)
.write
.format("csv")
.option("nullValue", null)
.option("header", value = true)
.option("delimiter", ",")
.save(s"$dir/$country/csv")
}
Upvotes: 0
Reputation: 767
you could explode the array and then filter:
hotelsDF.withColumn(
"tags1",
explode(col("tags"))
).drop(
"tags"
).filter(
(col("tags1.key") == "tourism") & (col("tags1.value") == "hotel")
).show()
Upvotes: 1