jnaour
jnaour

Reputation: 299

Operation on Data Frame

I use the DataFrame API from Spark 1.3.

I would like to get the day of the week from a date in a DataFrame without losing all elements of the DataFrame.

I used to use jodatime to get it on a simple map before using DataFrame API.

Right now a solution that works:

sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema))

Is it possible to do the operation without going back to a map on an RDD[Row] and then create a DataFrame with this RDD?

Upvotes: 7

Views: 8254

Answers (2)

skywalkerytx
skywalkerytx

Reputation: 108

Try this

Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed"))

MyUdf is a udf defined by you.

Upvotes: 1

Spiro Michaylov
Spiro Michaylov

Reputation: 3571

You can use a combination of calling select() on the DataFrame and a user-defined function (UDF) to transform the column in question.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._

A case class to set up the example DataFrame.

private case class Cust(id: Integer, name: String, 
        sales: Double, discount: Double, state: String)

Then set up a SQLContext and create the DataFrame as follows:

import sqlContext.implicits._

val custs = Seq(
  Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
  Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
  Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
  Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
  Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
val customerDF = sc.parallelize(custs, 4).toDF()

Register a simple UDF that you'll use to transform the "discount" column.

val myFunc = udf {(x: Double) => x + 1}

Get the columns, having applied the UDF to the "discount" column and leaving the others as they were.

val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")

I'd like to find a "better" way to match the column but the following works. Use as() to give the column a new name just because we can!

val mappedCols = cols.map(c => 
  if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)

Use select() to produce the new DataFrame

val newDF = customerDF.select(mappedCols:_*)

You've changed

id name            sales    discount state
1  Widget Co       120000.0 0.0      AZ   
2  Acme Widgets    410500.0 500.0    CA   
3  Widgetry        410500.0 200.0    CA   
4  Widgets R Us    410500.0 0.0      CA   
5  Ye Olde Widgete 500.0    0.0      MA   

into

id name            sales    transformed state
1  Widget Co       120000.0 1.0         AZ   
2  Acme Widgets    410500.0 501.0       CA   
3  Widgetry        410500.0 201.0       CA   
4  Widgets R Us    410500.0 1.0         CA   
5  Ye Olde Widgete 500.0    1.0         MA   

You can find the full example source code here. You can make it simpler if you're not fussy about an exact column replacement.

Upvotes: 10

Related Questions