Reputation: 75
I want to achieve the below in scala for a spark dataframe,
I am not sure how to loop through columns and select each column and flag variable each iteration of the loop. What I tried is :-
for (a <- colnames) {
val dat1 = data.filter($"cust_flag".isin("1")).select(a)
val dat0 = data.filter($"cust_flag".isin("0")).select(a)
val m0 = dat1.select(avg(a)).asInstanceOf[Double]
val m1 = dat0.select(avg(a)).asInstanceOf[Float]
val stdev = data.agg(stddev(a)).asInstanceOf[Float]
val rpb = ((m1 - m0) / stdev)*p*q
println(rpb)
Now I am getting an error - Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.Dataset cannot be cast to java.lang.Float
Upvotes: 0
Views: 1545
Reputation: 306
I would suggest you use df.selectExpr()
which can take a sequence of string:
val expressions = Seq("avg(col1) as avg_col1","std_dev(col1) as sd_col1", "...")
df.selectExpr(expressions:_*)
You can do almost everything with that function building the array of expressions as you wish in a for loop.
I suggest anyway that you show us an example of expected input/output (the code you wrote doesn't tell much).
Upvotes: 0
Reputation: 2431
As I understand, you are trying to get average of each columns according to flag and Standard Deviation of each column irrespective of flag. After it you are applying formula and calculating rpb.
On the basis of same logic, I have taken sample data and written code without loop. It will be faster than the loop logic you are using. Spark does not good with loop logic so try to get all required data into one row (such as avg0, avg1 and StdDev in below example) and after it process horizontally or in batch.
Please note, as I commented above. I didn't understand the value of p and q so I have ignored it in final Output dataframe logic. You can add directly if these are variables declared before.
scala> import org.apache.spark.sql.types._
scala> val df = Seq(
| ("121", "442", "512","1"),
| ("134", "434", "752","0"),
| ("423", "312", "124","1"),
| ("432", "677", "752","0"),
| ("332", "424", "111","1")).
| toDF("col1","col2","col3","cust_flag").
| withColumn("col1", $"col1".cast(DoubleType)).
| withColumn("col2", $"col2".cast(DoubleType)).
| withColumn("col3", $"col3".cast(DoubleType))
scala> df.show
+-----+-----+-----+---------+
| col1| col2| col3|cust_flag|
+-----+-----+-----+---------+
|121.0|442.0|512.0| 1|
|134.0|434.0|752.0| 0|
|423.0|312.0|124.0| 1|
|432.0|677.0|752.0| 0|
|332.0|424.0|111.0| 1|
+-----+-----+-----+---------+
scala>val colSeq = Seq("col1", "col2", "col3")
scala> val aggdf = colSeq.map(c => {
| df.groupBy("cust_flag").agg( lit(c).alias("columnName"), avg(c).cast("Decimal(14,2)").alias("avg"))
| })
scala> val devdf = colSeq.map(c => {
| df.agg( lit(c).alias("columnName"), stddev(c).cast("Decimal(14,2)").alias("StdDev"))
| })
scala> val avgDF = aggdf.reduce(_ union _)
scala> val stdDevDF = devdf.reduce(_ union _)
scala> val finalAvgDF = avgDF.filter(col("cust_flag") === 1).alias("1").join(avgDF.filter(col("cust_flag") === 0).alias("0"), List("columnName")).select(col("columnName"), col("1.avg").alias("avg1"), col("0.avg").alias("avg0"))
scala> val outDF = finalAvgDF.join(stdDevDF, List("columnName"))
scala> outDF.show()
+----------+------+------+------+
|columnName| avg1| avg0|StdDev|
+----------+------+------+------+
| col1|292.00|283.00|152.07|
| col2|392.67|555.50|133.48|
| col3|249.00|752.00|319.16|
+----------+------+------+------+
//apply your final formula to ger rpb
scala> outDF.withColumn("rpb", (col("avg1") - col("avg0"))/col("StdDev")).show
+----------+------+------+------+--------------------+
|columnName| avg1| avg0|StdDev| rpb|
+----------+------+------+------+--------------------+
| col1|292.00|283.00|152.07| 0.05918327086210298|
| col2|392.67|555.50|133.48|-1.21988312855858556|
| col3|249.00|752.00|319.16|-1.57601203158290513|
+----------+------+------+------+--------------------+
Upvotes: 0
Reputation: 4957
We have direct function for mean() and stddev()
Create two filter data set
ie.
1 for flag =0 and 2 for flag =1 and
dfcol0= df.filter(df("colname") === "0")
dfcol1= df.filter(df("colname") === "1")
Now using stddev() and mean() function get what is required .
dfcol0.select(stddev("coname")).show(false)
dfcol0.select(mean("coname")).show(false)
Upvotes: 1
Reputation: 13528
To create a column with a given name from a string, a simple way is to use:
import org.apache.spark.sql.{functions => sf}
df.select(sf.col(colName))
You can combine this in control logic (your loop) as you see fit.
If you want to know what columns are in the dataframe, use df.columns
.
Upvotes: 1