Ankit Agrahari
Ankit Agrahari

Reputation: 379

Exploding Nested Struct In Spark Dataframe having Different Schema

I have a json which has below schema:

 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)

And the output that i am trying to achieve is:

 PoolId ClientID Client_Active
 1      1        true
 1      2        false
 2      1        true

This schema keeps on changing with json.Eg for now there are 2 Pool id, there may be another json which will have 5 Pool Id and same is with CLient Id.

The problem with is :

  1. We cant use Explode on struct.
  2. Pool cant be converted to Map as each time client has different client ID that leads to different schema for each row.

Any thought how to achieve this?

I have tried this link for converting to Struct to Map and then exploding but it doesn't work when there are different numbers of Client IDs in different Pool.

Upvotes: 0

Views: 391

Answers (1)

baitmbarek
baitmbarek

Reputation: 2518

From my perspective you only need to define an UDF.

Here's an example :

  1. Define a projection case class (what you want as a resulting structure)
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
  1. Define an UDF like the one below, allowing you to work both with your structure (fields) and data:
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
  1. Use your UDF :
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache

newDF.show(false)

The output is the expected one :

+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+

Upvotes: 4

Related Questions