CoolGoose
CoolGoose

Reputation: 160

Spark Dataframe extracting columns based dynamically selected columns

Schema of input dataframe

- employeeKey (int)  
- employeeTypeId (string) 
- loginDate (string)
- employeeDetailsJson (string)
{"Grade":"100","ValidTill":"2021-12-01","Supervisor":"Alex","Vendor":"technicia","HourlyRate":29}

For Perm employees , some attributes are available and some not. Same for Contracting Employees.

So looking to find an efficient way to build dataframe based on only selected columns, as against transforming all columns and select the ones which I need.

Also please advise this is the best way to extract values from json string based on a key. As the attributes in the string are dynamic, I can not build StructSchema based on it. So using good old get_json_object.

(spark 2.45 and will use spark 3 in future)

  val dfSelectColumns=List("Employee-Key", "Employee-Type","Login-Date","cont.Vendor-Name","cont.Hourly-Rate" )

//val dfSelectColumns=List("Employee-Key", "Employee-Type","Login-Date","perm.Level","perm-Validity","perm.Supervisor" )

 val resultDF = inputDF.get
        .withColumn("Employee-Key", col("employeeKey"))
        .withColumn("Employee-Type", when(col("employeeTypeId") === 1, "Permanent")
          .when(col("employeeTypeId") === 2, "Contractor")
          .otherwise("unknown"))  
        .withColumn("Login-Date", to_utc_timestamp(to_timestamp(col("loginDate"), "yyyy-MM-dd'T'HH:mm:ss"), ""America/Chicago""))
        .withColumn("perm.Level", get_json_object(col("employeeDetailsJson"), "$.Grade"))
        .withColumn("perm.Validity", get_json_object(col("employeeDetailsJson"), "$.ValidTill"))
        .withColumn("perm.SuperVisor", get_json_object(col("employeeDetailsJson"), "$.Supervisor"))
        .withColumn("cont.Vendor-Name", get_json_object(col("employeeDetailsJson"), "$.Vendor"))
        .withColumn("cont.Hourly-Rate", get_json_object(col("employeeDetailsJson"), "$.HourlyRate"))
        .select(dfSelectColumns.head, dfSelectColumns.tail: _*)

Upvotes: 0

Views: 475

Answers (2)

skybutter
skybutter

Reputation: 116

I see that you have 2 schemas, one for Permanent and another for Contractor. You can have 2 schemas.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schemaBase = new StructType().add("Employee-Key", IntegerType).add("Employee-Type", StringType).add("Login-Date", DateType)
val schemaPerm = schemaBase.add("Level", IntegerType).add("Validity", StringType)// Permanent attributes
val schemaCont = schemaBase.add("Vendor", StringType).add("HourlyRate", DoubleType)  // Contractor attributes

Then you can use the 2 schemas to load the data into dataframe.
For Permanent Employee:

val jsonPermDf = Seq( // Construct sample dataframe
  (2, """{"Employee-Key":2, "Employee-Type":"Permanent", "Login-Date":"2021-11-01", "Level":3, "Validity":"ok"}""")
  , (3, """{"Employee-Key":3, "Employee-Type":"Permanent", "Login-Date":"2020-10-01", "Level":2, "Validity":"ok-yes"}""")
).toDF("key", "raw_json")

val permDf = jsonPermDf.withColumn("data", from_json(col("raw_json"),schemaPerm)).select($"data.*")
permDf.show()

For Contractor:

val jsonContDf = Seq(  // Construct sample dataframe
  (1, """{"Employee-Key":1, "Employee-Type":"Contractor", "Login-Date":"2021-12-01", "Vendor":"technicia", "HourlyRate":29}""")
  , (4, """{"Employee-Key":4, "Employee-Type":"Contractor", "Login-Date":"2019-09-01", "Vendor":"Minis", "HourlyRate":35}""")
).toDF("key", "raw_json")

val contDf = jsonContDf.withColumn("data", from_json(col("raw_json"),schemaCont)).select($"data.*")
contDf.show()

This is the result datafrme for Permanent:

+------------+-------------+----------+-----+--------+
|Employee-Key|Employee-Type|Login-Date|Level|Validity|
+------------+-------------+----------+-----+--------+
|           2|    Permanent|2021-11-01|    3|      ok|
|           3|    Permanent|2020-10-01|    2|  ok-yes|
+------------+-------------+----------+-----+--------+

This is the result dataframe for Contractor:

+------------+-------------+----------+---------+----------+
|Employee-Key|Employee-Type|Login-Date|   Vendor|HourlyRate|
+------------+-------------+----------+---------+----------+
|           1|   Contractor|2021-12-01|technicia|      29.0|
|           4|   Contractor|2019-09-01|    Minis|      35.0|
+------------+-------------+----------+---------+----------+

Upvotes: 1

blackbishop
blackbishop

Reputation: 32720

If the schema of the JSON in employeeDetailsJson is unstable, you can still parse it into Map(String, String) type using from_json function with schema map<string,string>. Then you can explode the map column and pivot to get keys as columns.

Example:

val df1 = df.withColumn(
    "employeeDetails",
    from_json(col("employeeDetailsJson"), "map<string,string>")
  ).select(
    col("employeeKey"),
    col("employeeTypeId"),
    col("loginDate"),
    explode("employeeDetails")
  ).groupBy("employeeKey", "employeeTypeId", "loginDate")
  .pivot("key")
  .agg(first("value"))

df1.show()
//+-----------+--------------+---------------------+-----+----------+----------+----------+---------+
//|employeeKey|employeeTypeId|loginDate            |Grade|HourlyRate|Supervisor|ValidTill |Vendor   |
//+-----------+--------------+---------------------+-----+----------+----------+----------+---------+
//|1          |1             |2021-02-05'T'21:28:06|100  |29        |Alex      |2021-12-01|technicia|
//+-----------+--------------+---------------------+-----+----------+----------+----------+---------+

Upvotes: 1

Related Questions