Anis Smail
Anis Smail

Reputation: 779

How to flatten nested struct in array?

This is my current schema :

 |-- _id: string (nullable = true)
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- adr1: struct (nullable = true)
 |    |    |    |-- resid: string (nullable = true)

And this is what I want to obtain :

 |-- _id: string (nullable = true)
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- resid: string (nullable = true)

I am using the java api.

Upvotes: 1

Views: 910

Answers (2)

Piotr Kalański
Piotr Kalański

Reputation: 689

You can use map transformation:

import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

Encoder<PeopleFlatten> peopleFlattenEncoder = Encoders.bean(PeopleFlatten.class);

people
  .map(person -> new PeopleFlatten(
      person.get_id(),
      person.getPerson().stream().map(p ->
        new PersonFlatten(
          p.getName(),
          p.getAdr1().getResid()
        )
      ).collect(Collectors.toList())
    ),
    peopleFlattenEncoder
  );

where PeopleFlatten and PersonFlatten are POJO corresponding to expected schema in question.

public class PeopleFlatten implements Serializable {
   private String _id;
   private List<PersonFlatten> person;
   // getters and setters
}

public class PersonFlatten implements Serializable {
   private String name;
   private String resid;
   // getters and setters
}

Upvotes: 3

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

If it were Scala, I'd do the following, but since the OP asked about Java, I'm offering it as a guidance only.

Solution 1 - Memory-Heavy

case class Address(resid: String)
case class Person(name: String, adr1: Address)

val people = Seq(
  ("one", Array(Person("hello", Address("1")), Person("world", Address("2"))))
).toDF("_id", "persons")

import org.apache.spark.sql.Row
people.as[(String, Array[Person])].map { case (_id, arr) => 
  (_id, arr.map { case Person(name, Address(resid)) => (name, resid) })
}

This approach however is quite memory expensive as the internal binary rows are copied to their JVM objects that puts the environment to face OutOfMemoryErrors.

Solution 2 - Expensive but Language-Independent

The other query with worse performance (but less memory requirement too) could use explode operator to destructure the array first that would give us an easy access to internal structs.

val solution = people.
  select($"_id", explode($"persons") as "exploded"). // <-- that's expensive
  select("_id", "exploded.*"). // <-- this is the trick to access struct's fields
  select($"_id", $"name", $"adr1.resid").
  select($"_id", struct("name", "resid") as "person").
  groupBy("_id"). // <-- that's expensive
  agg(collect_list("person") as "persons")
scala> solution.printSchema
root
 |-- _id: string (nullable = true)
 |-- persons: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- resid: string (nullable = true)

The nice thing about the solution is that it has almost nothing related to Scala or Java (so you could use it right away regardless of the language of your choice).

Upvotes: 2

Related Questions