user2811630
user2811630

Reputation: 475

Compare values of multiple columns based on column identifier

I have two dataframes.

The first one looks like this (number of channel will vary depending on the Type) This dataframe stores the type of device and the value for each channel.

+-----+----------+----------+
| Type|X_ChannelA|Y_ChannelB|
+-----+----------+----------+
|TypeA|        11|        20|
+-----+----------+----------+

The second dataframe is imported from an csv and is generated by me.

Right now I have this format (Can be changed to anything needed)

+-----+--------------+--------------+--------------+--------------+
| Type|X_ChannelA_min|X_ChannelA_max|Y_ChannelB_min|Y_ChannelB_max|
+-----+--------------+--------------+--------------+--------------+
|TypeA|             8|            12|             9|            13|
+-----+--------------+--------------+--------------+--------------+

Now I want to compare the actual Channel values to the min and max ones and create a new column with _status which contains a one if the value is in between min and max and a zero if it exceeds either min or max.

Wanted result for this examples

+-----+----------+----------+-----------------+-----------------+
| Type|X_ChannelA|Y_ChannelB|X_ChannelA_status|Y_ChannelB_status|
+-----+----------+----------+-----------------+-----------------+
|TypeA|        11|        20|                1|                0|
+-----+----------+----------+-----------------+-----------------+

Code is here:

    val df_orig = spark.sparkContext.parallelize(Seq(
      ("TypeA", 11, 20)
    )).toDF("Type", "X_ChannelA", "Y_ChannelB")

    val df_def = spark.sparkContext.parallelize(Seq(
      ("TypeA", 8, 12, 9, 13)
    )).toDF("Type", "X_ChannelA_min", "X_ChannelA_max", "Y_ChannelB_min", "Y_ChannelB_max")

I have tried a few different things already without success.

Like creating columns by getting a string array of all channels and then creating the columns with

val pattern = """[XYZP]_Channel.*"""
val fieldNames = df_orig.schema.fieldNames.filter(_.matches(pattern))
fieldNames.foreach(x => df.withColumn(s"${x}_status", <compare logic comes here>)

My next idea was to join df_orig with df_def and then add channel_value, channel_min, channel_max with concat_ws into a single column, compare the values with the compare logic and write the result into the column

+-----+----------+----------+----------------+----------------+-------------+...
| Type|X_ChannelA|Y_ChannelB|X_ChannelA_array|Y_ChannelB_array|X_ChannelA_st|
+-----+----------+----------+----------------+----------------+-------------+...
|TypeA|        11|        20|     [11, 8, 12]|     [20, 9, 13]|            1|
+-----+----------+----------+----------------+----------------+-------------+...

If there is a simpler solution it would be nice to get a push into the right direction.

Edit: If my description was unclear basically what I am looking for is: what i am looking for is

foreach channel in channellist (
    ds.withColumn("<channel>_status", when($"<channel>" < $"<channel>_min" || $"<channel>" > $"<channel>_max"), 1).otherwise 0)
)

Edit: I found a solution which is:

val df_joined = df_orig.join(df_def, Seq("Type"))
val pattern = """[XYZP]_Channel.*"""
val fieldNames = df_orig.schema.fieldNames.filter(_.matches(pattern))
val df_newnew = df_joined.select(col("*") +: (fieldNames.map(c => when(col(c) <= col(c+"_min") || col(c) >= col(c+"_max"), 1).otherwise(0).as(c+"_status))): _*)

Upvotes: 1

Views: 2053

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

join is the way to go. And you have to utilize when function appropriately as below

import org.apache.spark.sql.functions._
df_orig.join(df_def, Seq("Type"), "left")
  .withColumn("X_ChannelA_status", when(col("X_ChannelA") >= col("X_ChannelA_min") && col("X_ChannelA") <= col("X_ChannelA_max"), 1).otherwise(0))
  .withColumn("Y_ChannelB_status", when(col("Y_ChannelB") >= col("Y_ChannelB_min") && col("Y_ChannelB") <= col("Y_ChannelB_max"), 1).otherwise(0))
  .select("Type", "X_ChannelA", "Y_ChannelB", "X_ChannelA_status", "Y_ChannelB_status")

You should get your desired output as

+-----+----------+----------+-----------------+-----------------+
|Type |X_ChannelA|Y_ChannelB|X_ChannelA_status|Y_ChannelB_status|
+-----+----------+----------+-----------------+-----------------+
|TypeA|11        |20        |1                |0                |
+-----+----------+----------+-----------------+-----------------+

Updated

If you have more columns in your channel dataframe and if you don't want to hard code all the columns as mentioned above then you can benefit from foldLeft (a powerful function in scala)

But before that you will have to decide the columns you want to iterate (i.e. the channels)

val df_orig_Columns = df_orig.columns
val columnsToIterate = df_orig_Columns.toSet - "Type"

Then after you join them, use foldLeft to generalize the above withColumn process

val joinedDF = df_orig.join(df_def, Seq("Type"), "left")

import org.apache.spark.sql.functions._
val finalDF = columnsToIterate.foldLeft(joinedDF){(tempDF, colName) => tempDF.withColumn(colName+"_status", when(col(colName) >= col(colName+"_min") && col(colName) <= col(colName+"_max"), 1).otherwise(0))}

Finally you select the necessary columns as

val finalDFcolumns = df_orig_Columns ++ columnsToIterate.map(_+"_status")
finalDF.select(finalDFcolumns.map(col): _*)

I guess thats it. Hope its more than helpful

Upvotes: 2

Related Questions