Bentech
Bentech

Reputation: 498

Spark Scala. Using an external variables "dataframe" in a map

I have two dataframes,

val df1 = sqlContext.csvFile("/data/testData.csv")
val df2 = sqlContext.csvFile("/data/someValues.csv")


 df1=
 startTime  name    cause1  cause2
 15679       CCY    5         7
 15683              2         5
 15685              1         9
 15690              9         6

df2=
cause   description causeType
3       Xxxxx       cause1
1       xxxxx       cause1
3       xxxxx       cause2
4       xxxxx
2       Xxxxx

and I want to apply a complex function getTimeCust to both cause1 and cause2 to determine a final cause, then match the description of this final cause code in df2. I must have a new df (or rdd) with the following columns:

startTime   name    cause   descriptionCause

My solution was

  val rdd2 = df1.map(row => {
  val (cause, descriptionCause) = getTimeCust(row.getInt(2), row.getInt(3), df2)
  Row (row(0),row(1),cause,descriptionCause)
  })

If a run the code below I have a NullPointerException because the df2 is not visible.

The function getTimeCust(Int, Int, DataFrame) works well outside the map.

Upvotes: 0

Views: 1294

Answers (3)

Bentech
Bentech

Reputation: 498

Thank you @Assaf. Thanks to your answer and the spark udf with data frame. I have resolved the this problem. The solution is:

   val getTimeCust= udf((cause1: Any, cause2: Any) => {
   var lastCause = 0
   var categoryCause=""
   var descCause=""
   lastCause= .............
   categoryCause= ........

    (lastCause, categoryCause)
  })

and after call the udf as:

  val dfWithCause = df1.withColumn("df1_cause", getTimeCust( $"cause1", $"cause2"))

ANd finally the join

 val dfFinale=dfWithCause.join(df2, dfWithCause.col("df1_cause._1") === df2.col("cause") and dfWithCause.col("df1_cause._2") === df2.col("causeType"),'outer' )

Upvotes: 0

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

Try something like this:

def f1(cause1: Int, cause2: Int): Int = some logic to calculate cause

import org.apache.spark.sql.functions.udf
val dfCause = df1.withColumn("df1_cause", udf(f1)($"cause1", $"cause2"))
val dfJoined = dfCause.join(df2, on= df1Cause("df1_cause")===df2("cause"))
dfJoined.select("cause", "description").show()

Upvotes: 0

puhlen
puhlen

Reputation: 8529

Use df1.join(df2, <join condition>) to join your dataframes together then select the fields you need from the joined dataframe.

You can't use spark's distributed structures (rdd, dataframe, etc) in code that runs on an executor (like inside a map).

Upvotes: 2

Related Questions