Reputation: 779
This is my current schema :
|-- _id: string (nullable = true)
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- adr1: struct (nullable = true)
| | | |-- resid: string (nullable = true)
And this is what I want to obtain :
|-- _id: string (nullable = true)
|-- person: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- resid: string (nullable = true)
I am using the java api.
Upvotes: 1
Views: 910
Reputation: 689
You can use map
transformation:
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
Encoder<PeopleFlatten> peopleFlattenEncoder = Encoders.bean(PeopleFlatten.class);
people
.map(person -> new PeopleFlatten(
person.get_id(),
person.getPerson().stream().map(p ->
new PersonFlatten(
p.getName(),
p.getAdr1().getResid()
)
).collect(Collectors.toList())
),
peopleFlattenEncoder
);
where PeopleFlatten
and PersonFlatten
are POJO corresponding to expected schema in question.
public class PeopleFlatten implements Serializable {
private String _id;
private List<PersonFlatten> person;
// getters and setters
}
public class PersonFlatten implements Serializable {
private String name;
private String resid;
// getters and setters
}
Upvotes: 3
Reputation: 74619
If it were Scala, I'd do the following, but since the OP asked about Java, I'm offering it as a guidance only.
case class Address(resid: String)
case class Person(name: String, adr1: Address)
val people = Seq(
("one", Array(Person("hello", Address("1")), Person("world", Address("2"))))
).toDF("_id", "persons")
import org.apache.spark.sql.Row
people.as[(String, Array[Person])].map { case (_id, arr) =>
(_id, arr.map { case Person(name, Address(resid)) => (name, resid) })
}
This approach however is quite memory expensive as the internal binary rows are copied to their JVM objects that puts the environment to face OutOfMemoryErrors.
The other query with worse performance (but less memory requirement too) could use explode
operator to destructure the array first that would give us an easy access to internal structs.
val solution = people.
select($"_id", explode($"persons") as "exploded"). // <-- that's expensive
select("_id", "exploded.*"). // <-- this is the trick to access struct's fields
select($"_id", $"name", $"adr1.resid").
select($"_id", struct("name", "resid") as "person").
groupBy("_id"). // <-- that's expensive
agg(collect_list("person") as "persons")
scala> solution.printSchema
root
|-- _id: string (nullable = true)
|-- persons: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- resid: string (nullable = true)
The nice thing about the solution is that it has almost nothing related to Scala or Java (so you could use it right away regardless of the language of your choice).
Upvotes: 2