cwl
cwl

Reputation: 531

Spark: normalize each row of a DataFrame

I have a Spark DataFrame, which looks like the following

df.show()
+------+------+------+
|  col1|  col2|  col3|
+------+------+------+
|   5.0|   5.0|   0.0|
|   2.0|   3.0|   5.0|
|   4.0|   1.0|  10.0|
+------+------+------+

I would like to normalize each individual rows, such that after the operation, the new columns would look like:

+--------+--------+--------+
|new_col1|new_col2|new_col3|
+--------+--------+--------+
|     0.5|     0.5|     0.0|
|     0.2|     0.3|     0.5|
|0.266667|0.066667|0.666667|
+--------+--------+--------+

More formally, the operation I would like to apply is:

for each row,

    new_col_i = col_i / (col_1 + col_2 + col_3)

I will need to do this programmatically instead of listing all the columns, as my DataFrame has many columns.

Current solution:

The current solution I think of is to create a column to represent the sum of all the entries for each row, then divide each column by that sum column.

var newDF = df.withColumn("total", df.columns.map(c => col(c)).reduce((c1, c2) => c1 + c2))

for (c <- Array("col1", "col2", "col3")) {
    newDF = newDF.withColumn("normalized_" + c, col(c).divide(col("total")))
}
newDF.show()

+----+----+----+-----+-------------------+-------------------+------------------+
|col1|col2|col3|total|    normalized_col1|    normalized_col2|   normalized_col3|
+----+----+----+-----+-------------------+-------------------+------------------+
| 5.0| 5.0| 0.0| 10.0|                0.5|                0.5|               0.0|
| 2.0| 3.0| 5.0| 10.0|                0.2|                0.3|               0.5|
| 4.0| 1.0|10.0| 15.0|0.26666666666666666|0.06666666666666667|0.6666666666666666|
+----+----+----+-----+-------------------+-------------------+------------------+

Any alternative to make the code more succinct?

Upvotes: 2

Views: 3431

Answers (2)

Tzach Zohar
Tzach Zohar

Reputation: 37832

Your solution is correct, and can't be improved too much. You can get rid of the non-idiomatic use of var by replacing the for-loop with a foldLeft, and use some more syntactic sugar all around, but other than that it would remain the same:

val withTotal = df.withColumn("total", df.columns.map(col).reduce(_ + _))

val result = df.columns.foldLeft(withTotal) {
  (tmp, c) => tmp.withColumn(s"new_$c", $"$c" / $"total")
}
  .drop(df.columns: _*)
  .drop("total")

Upvotes: 5

Fisseha Berhane
Fisseha Berhane

Reputation: 2653

For anyone looking how to do the row-wise normalization in PySpark, the code below worked for me:

new_df = df.withColumn('total', sum(df[col] for col in df.columns))
my_schema = StructType([StructField(col, DoubleType(), True) for col in df.columns])
result  = new_df.rdd.map(lambda x: [100.00 * x[i]/x[len(x) -1] for i in range(len(x)-1)]).toDF(schema = my_schema)
result.show()

+------------------+-----------------+-----------------+ | col1 | col2 | col3 | +------------------+-----------------+-----------------+ | 50.0| 50.0| 0.0| | 20.0| 30.0| 50.0| |26.666666666666668|6.666666666666667|66.66666666666667| +------------------+-----------------+-----------------+

Upvotes: 0

Related Questions