Reputation: 299
I use the DataFrame API from Spark 1.3.
I would like to get the day of the week from a date in a DataFrame without losing all elements of the DataFrame.
I used to use jodatime to get it on a simple map before using DataFrame API.
Right now a solution that works:
sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema))
Is it possible to do the operation without going back to a map on an RDD[Row]
and then create a DataFrame with this RDD?
Upvotes: 7
Views: 8254
Reputation: 108
Try this
Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed"))
MyUdf is a udf defined by you.
Upvotes: 1
Reputation: 3571
You can use a combination of calling select()
on the DataFrame
and a user-defined function (UDF) to transform the column in question.
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._
A case class to set up the example DataFrame
.
private case class Cust(id: Integer, name: String,
sales: Double, discount: Double, state: String)
Then set up a SQLContext
and create the DataFrame
as follows:
import sqlContext.implicits._
val custs = Seq(
Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
val customerDF = sc.parallelize(custs, 4).toDF()
Register a simple UDF that you'll use to transform the "discount" column.
val myFunc = udf {(x: Double) => x + 1}
Get the columns, having applied the UDF to the "discount" column and leaving the others as they were.
val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")
I'd like to find a "better" way to match the column but the following works.
Use as()
to give the column a new name just because we can!
val mappedCols = cols.map(c =>
if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)
Use select() to produce the new DataFrame
val newDF = customerDF.select(mappedCols:_*)
You've changed
id name sales discount state
1 Widget Co 120000.0 0.0 AZ
2 Acme Widgets 410500.0 500.0 CA
3 Widgetry 410500.0 200.0 CA
4 Widgets R Us 410500.0 0.0 CA
5 Ye Olde Widgete 500.0 0.0 MA
into
id name sales transformed state
1 Widget Co 120000.0 1.0 AZ
2 Acme Widgets 410500.0 501.0 CA
3 Widgetry 410500.0 201.0 CA
4 Widgets R Us 410500.0 1.0 CA
5 Ye Olde Widgete 500.0 1.0 MA
You can find the full example source code here. You can make it simpler if you're not fussy about an exact column replacement.
Upvotes: 10