Ravi
Ravi

Reputation: 137

Spark scala derive column from array columns based on rules

I am new to spark and scala. I have a json array struct as input, similar to the below schema.

root
|-- entity: struct (nullable = true)
|    |-- email: string (nullable = true)
|    |-- primaryAddresses: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- postalCode: string (nullable = true)
|    |    |    |-- streetAddress: struct (nullable = true)
|    |    |    |    |-- line1: string (nullable = true)

I flattened the array struct to the below sample Dataframe

+-------------+--------------------------------------+--------------------------------------+
|entity.email |entity.primaryAddresses[0].postalCode |entity.primaryAddresses[1].postalCode |....
+-------------+--------------------------------------+--------------------------------------+
|[email protected]      |                                      |                                      |
|[email protected]      |                                      |12345                                 |
|[email protected]      |12345                                 |                                      |
|[email protected]      |0                                     |0                                     |
+-------------+--------------------------------------+--------------------------------------+

My end goal is to calculate presence/absence/zero counts for each of the columns for data quality metrics.But before I calculate the data quality metrics I am looking for an approach to derive one new column for each of the array column elements as below such that

Below is a sample intermediate dataframe that I am trying to achieve with a column derived for each of array elements. The original array elements are dropped.

 
+-------------+--------------------------------------+
|entity.email |entity.primaryAddresses.postalCode    |.....
+-------------+--------------------------------------+
|[email protected]      |                                      |
|[email protected]      |1                                     |
|[email protected]      |1                                     |
|[email protected]      |0                                     |
+-------------+--------------------------------------+

The input json records elements are dynamic and can change. To derive columns for array element I build a scala map with a key as column name without array index (example:entity.primaryAddresses.postalCode) and value as list of array elements to run rules on for the specific key. I am looking for an approach to achieve the above intermediate data frame.

One concern is that for certain input files after I flatten the Dataframe , the dataframe column count exceeds 70k+. And since the record count is expected to be in millions I am wondering if instead of flattening the json if I should explode each of elements for better performance.

Appreciate any ideas. Thank you.

Upvotes: 0

Views: 880

Answers (2)

s.polam
s.polam

Reputation: 10362

Created helper function & You can directly call df.explodeColumns on DataFrame. Below code will flatten multi level array & struct type columns.

Use below function to extract columns & then apply your transformations on that.

scala> df.printSchema
root
 |-- entity: struct (nullable = false)
 |    |-- email: string (nullable = false)
 |    |-- primaryAddresses: array (nullable = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- postalCode: string (nullable = false)
 |    |    |    |-- streetAddress: struct (nullable = false)
 |    |    |    |    |-- line1: string (nullable = false)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}
scala> df.explodeColumns.printSchema
root
 |-- entity_email: string (nullable = false)
 |-- entity_primaryAddresses_postalCode: string (nullable = true)
 |-- entity_primaryAddresses_streetAddress_line1: string (nullable = true)

Upvotes: 2

dumitru
dumitru

Reputation: 2108

You can leverage on a custom user define function that can help you do the data quality metrics.

val postalUdf = udf((postalCode0: Int, postalCode1: Int) => {
        //TODO implement you logic here
    })

then use is to create a new dataframe column

df
  .withColumn("postcalCode", postalUdf(col("postalCode_0"), col("postalCode_1")))
  .show()

Upvotes: 1

Related Questions