Markus
Markus

Reputation: 3782

Optimize a piece of code that uses map action

The following piece of code takes a lot of time on 4Gb of raw data in a cluster:

df.select("type", "user_pk", "item_pk","timestamp")
      .withColumn("date",to_date(from_unixtime($"timestamp")))
      .filter($"date" > "2018-04-14")
      .select("type", "user_pk", "item_pk")
      .map {
        row => {
          val typef = row.get(0).toString
          val user = row.get(1).toString
          val item = row.get(2).toString
          (typef, user, item)
        }
      }

The output should be of type Dataset[(String,String,String)].

I guess that map part takes a lot of time. Is there any way to optimize this piece of code?

Upvotes: 0

Views: 46

Answers (2)

vvg
vvg

Reputation: 6385

You're creating date column with Date type and then compare it with string?? I'd assume some implicit transformation is happening underneath (for each row while filtering).

Instead I'd convert that string to date to timestamp and do integer comparison (as you're using from_unixtime I assume timestamp is stored as System.currenttimemillis or similar):

timestamp = some_to_timestamp_func("2018-04-14")
df.select("type", "user_pk", "item_pk","timestamp")
  .filter($"timestamp" > timestamp)
... etc

Upvotes: 1

Alper t. Turker
Alper t. Turker

Reputation: 35219

I seriously doubt the map is the problem, nonetheless I wouldn't use it at all and go with standard Dataset converter

import df.sparkSession.implicits._

df.select("type", "user_pk", "item_pk","timestamp")
  .withColumn("date",to_date(from_unixtime($"timestamp")))
  .filter($"date" > "2018-04-14")
  .select($"type" cast "string", $"user_pk" cast "string", $"item_pk" cast "string")
  .as[(String,String,String)]

Upvotes: 1

Related Questions