biniam
biniam

Reputation: 8199

Spark: Combine two Java object RDDs into one

I have two JavaRDDs of the same object and I want to combine the data into one. These are:

Domain

public class User {
    String name;
    String email;
    String profession;
    Integer age;

    // constructor

    // setters and getters
}

RDD 1

User user1 = new User ("Name", "[email protected]");
User user2 = new User ("Name2", "[email protected]");

List<User> userList = new ArrayList<>();
userList.add(user1);
userList.add(user2);

JavaRDD<User> leftUserJavaRDD = sc.parallelize(userList);

RDD 2

User user3 = new User ("[email protected]", "Software Engineer", 26);
User user4 = new User ("[email protected]", "Lawyer", 35);

List<User> userList2 = new ArrayList<>();
userList.add(user3);
userList.add(user4);

JavaRDD<User> rightUserJavaRDD = sc.parallelize(userList2);

I want to combine the two RDD with the common email address. The combined RDD I want expect is:

User user1and3 = new User (
        "Name",
        "[email protected]",
        "Software Engineer",
        26);

User user2and4 = new User (
        "Name2",
        "[email protected]",
        "Lawyer",
        35);

How can I do this in Spark using Java? I tried union and cartesian but didn't work.

Upvotes: 1

Views: 2206

Answers (1)

biniam
biniam

Reputation: 8199

I got help from a colleague and here is the solution we got.

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.List;

public JavaRDD<User> getCombinedUsers(JavaRDD<User> leftUserJavaRDD, JavaRDD<User> rightUserJavaRDD) {

     JavaPairRDD<String, User> leftUserJavaPairRDD =
                leftUserJavaRDD.mapToPair(user -> new Tuple2<>(user.getEmail(), user));

     JavaPairRDD<String, User> rightUserJavaPairRDD =
                rightUserJavaRDD.mapToPair(user -> new Tuple2<>(user.getEmail(), user));

     return leftUserJavaPairRDD
                .union(rightUserJavaPairRDD)
                .reduceByKey(merge).values();
}

/**
 * Reduce Function for merging User with no profession and age information with the one that has profession and age information.
 */
private static Function2<User, User, User> merge =
            (User left, User right) ->
                    new User(left.getName(), left.getEmail(), right.getProfession(), right.getAge());

Upvotes: 1

Related Questions