Reputation: 231
I have a dataframe like this:
val df = Seq(
("a", Seq(2.0)),
("a", Seq(1.0)),
("a", Seq(0.5)),
("b", Seq(24.0)),
("b", Seq(12.5)),
("b", Seq(6.4)),
("b", Seq(3.2)),
("c", Seq(104.0)),
("c", Seq(107.4))
).toDF("key", "value")
I need to use an algorithm that takes in input a DataFrame object on distinct groups. To make this clearer, assume that I have to use StandardScaler scaling by groups.
In pandas I would do something like this (many type changes in the process):
from sklearn.preprocessing import StandardScaler
df.groupby(key) \
.value \
.transform(lambda x: StandardScaler \
.fit_transform(x \
.values \
.reshape(-1,1)) \
.reshape(-1))
I need to do this in scala because the algorithm I need to use is not the Scaler but another thing built in scala.
So far I've tried to do something like this:
import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {
val scaler = new StandardScaler()
.setInputCol("value")
.setOutputCol("scaled")
val output = scaler.fit(X)("scaled")
(output)
}
df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))
but of course it gives me an error:
command-144174313464261:21: error: type mismatch; found : org.apache.spark.sql.Column required: org.apache.spark.sql.Dataset[_] val output = scaler.fit(X)("scaled")
So I'm trying to transform a single Column object into a DataFrame object, without success. How do I do it?
If it's not possible, is there any workaround to solve this?
UPDATE 1
It seems I made some mistakes in the code, I tried to fix it (I think I did right):
val df = Seq(
("a", 2.0),
("a", 1.0),
("a", 0.5),
("b", 24.0),
("b", 12.5),
("b", 6.4),
("b", 3.2),
("c", 104.0),
("c", 107.4)
).toDF("key", "value")
def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {
val assembler = new VectorAssembler()
.setInputCols(Array("value"))
.setOutputCol("feature")
val scaler = new StandardScaler()
.setInputCol("feature")
.setOutputCol("scaled")
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler))
val output = pipeline.fit(X).transform(X)("scaled")
(output)
}
someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))
I still get an error:
org.apache.spark.sql.AnalysisException: Expression 'scaled#1294' not supported within a window function.;;
I am not sure about the reason for this error, I tried aliasing the column but it doesn't seem to work.
Upvotes: 1
Views: 392
Reputation: 27373
So I'm trying to transform a single Column object into a DataFrame object, without success. How do I do it?
You can't, a column
just references a column
of a DataFrame, it does not contain any data, it's not a data structure like a dataframe.
Your f
function will also not work like this. If you want to create a custom function to be used with Window
, then you need an UDAF (User-Defined-Aggregation-Function), which is pretty hard...
In your case, I would to a groupBy key
, collect_list of your values, then apply an UDF to do the scaling. Note that this only works of the data per key is not too large (larger than what fits into 1 executor), otherwise you need UDAF
Here an example:
// example scala method, scale to 0-1
def myScaler(data:Seq[Double]) = {
val mi = data.min
val ma = data.max
data.map(x => (x-mi)/(ma-mi))
}
val udf_myScaler = udf(myScaler _)
df
.groupBy($"key")
.agg(
collect_list($"value").as("values")
)
.select($"key",explode(arrays_zip($"values",udf_myScaler($"values"))))
.select($"key",$"col.values",$"col.1".as("values_scaled"))
.show()
gives:
+---+------+-------------------+
|key|values| values_scaled|
+---+------+-------------------+
| c| 104.0| 0.0|
| c| 107.4| 1.0|
| b| 24.0| 1.0|
| b| 12.5|0.44711538461538464|
| b| 6.4|0.15384615384615385|
| b| 3.2| 0.0|
| a| 2.0| 1.0|
| a| 1.0| 0.3333333333333333|
| a| 0.5| 0.0|
+---+------+-------------------+
Upvotes: 2