Reputation: 375
I want to order by timestamp some avro files that I retrieve from HDFS.
The schema of my avro files is :
headers : Map[String,String], body : String
Now the tricky part is that the timestamp is one of the key/value from the map. So I have the timestamp contained in the map like this :
key_1 -> value_1, key_2 -> value_2, timestamp -> 1234567, key_n -> value_n
Note that the type of the values is String.
I created a case class to create my dataset with this schema :
case class Root(headers : Map[String,String], body: String)
Creation of my dataset :
val ds = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
I don't really know how to begin with this problem since I can only get the columns headers and body. How can I get the nested values to finally sort by timestamp ?
I would like to do something like this :
ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")
A little precision : I don't want to loose any data from my initial dataset, just a sorting operation.
I use Spark 2.3.0.
Upvotes: 1
Views: 1229
Reputation: 1631
import org.apache.spark.sql.{Encoders, Encoder, Dataset}
import org.apache.spark.sql.functions.{col, desc}
import java.sql.Timestamp
case class Nested(key_1: String,key_2: String,timestamp: Timestamp,key_n: String)
case class Root(headers:Nested,body:String)
implicit val rootCodec: Encoder[Root] = Encoders.product[Root]
val avroDS:Dataset[Root] = spark.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sortedDF: DataFrame = avroDS.orderBy(desc(col("timestamp")))
This code snippet would directly cast your Avro data to Dataset[Root]
. You wont have to rely on importing sparksession.implicits
and would eliminate the step of casting your timestamp field to TimestampType. Internally, Spark's Timestamp datatype is implemented using java.sql.Timestamp
.
Upvotes: 0
Reputation: 22439
The loaded Dataset
should look something similar to the sample dataset below:
case class Root(headers : Map[String, String], body: String)
val ds = Seq(
Root(Map("k11"->"v11", "timestamp"->"1554231600", "k12"->"v12"), "body1"),
Root(Map("k21"->"v21", "timestamp"->"1554134400", "k22"->"v22"), "body2")
).toDS
You can simply look up the Map
by the timestamp
key, cast
the value to Long
, and perform an orderBy
as follows:
ds.
withColumn("ts", $"headers"("timestamp").cast("Long")).
orderBy("ts").
show(false)
// +-------------------------------------------------+-----+----------+
// |headers |body |ts |
// +-------------------------------------------------+-----+----------+
// |[k21 -> v21, timestamp -> 1554134400, k22 -> v22]|body2|1554134400|
// |[k11 -> v11, timestamp -> 1554231600, k12 -> v12]|body1|1554231600|
// +-------------------------------------------------+-----+----------+
Note that $"headers"("timestamp")
is just the same as using the apply
column method (i.e. $"headers".apply("timestamp")
).
Alternatively, you could also use getItem
to access the Map
by key, like:
$"headers".getItem("timestamp")
Upvotes: 1
Reputation: 411
You can use Scala's sortBy, which takes a function. I would advise you to explicitly declare the val ds as a Vector (or other collection), that way you will see the applicable functions in IntelliJ (if you're using IntelliJ) and it will definitely compile.
See my example below based on your code :
case class Root(headers : Map[String,String], body: String)
val ds: Vector[Root] = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse
Edit: added reverse (assuming you want it descending). Inside the function that you pass as argument, you would also put the processing to timestamp.
Upvotes: 2