Reputation: 3782
The following piece of code takes a lot of time on 4Gb of raw data in a cluster:
df.select("type", "user_pk", "item_pk","timestamp")
.withColumn("date",to_date(from_unixtime($"timestamp")))
.filter($"date" > "2018-04-14")
.select("type", "user_pk", "item_pk")
.map {
row => {
val typef = row.get(0).toString
val user = row.get(1).toString
val item = row.get(2).toString
(typef, user, item)
}
}
The output should be of type Dataset[(String,String,String)]
.
I guess that map
part takes a lot of time. Is there any way to optimize this piece of code?
Upvotes: 0
Views: 46
Reputation: 6385
You're creating date
column with Date type and then compare it with string??
I'd assume some implicit transformation is happening underneath (for each row while filtering).
Instead I'd convert that string to date to timestamp and do integer comparison (as you're using from_unixtime I assume timestamp is stored as System.currenttimemillis or similar):
timestamp = some_to_timestamp_func("2018-04-14")
df.select("type", "user_pk", "item_pk","timestamp")
.filter($"timestamp" > timestamp)
... etc
Upvotes: 1
Reputation: 35219
I seriously doubt the map
is the problem, nonetheless I wouldn't use it at all and go with standard Dataset
converter
import df.sparkSession.implicits._
df.select("type", "user_pk", "item_pk","timestamp")
.withColumn("date",to_date(from_unixtime($"timestamp")))
.filter($"date" > "2018-04-14")
.select($"type" cast "string", $"user_pk" cast "string", $"item_pk" cast "string")
.as[(String,String,String)]
Upvotes: 1