Fleur
Fleur

Reputation: 686

How to combine Spark RDD and PairRDD in Java

I have a data set with columns userId(String), itemId(int) and rating(int).

+----------+----------+---------+
| userId   |  itemId  |  rating |
+----------+----------+---------+
|  abc13   |    23    |    1    |
+----------+----------+---------+
|  qwe34   |    56    |    3    |
+----------+----------+---------+
|  qwe34   |    35    |    4    |
+----------+----------+---------+

I want to map the string userIds to unique long values. I tried to map the userIds using zipWithUniqueId() and it gives a pairRDD.

+------------+----------------+
|   userId   |  userIdMapped  |
+------------+----------------+
|    abc13   |        0       |   
+------------+----------------+
|    qwe34   |        1       |   
+------------+----------------+

I want to add the long values to another column and create the dataset as below:

+----------+----------+---------+----------------+
| userId   |  itemId  |  rating |  userIdMapped  |
+----------+----------+---------+----------------+
|  abc13   |    23    |    1    |       0        |
+----------+----------+---------+----------------+
|  qwe34   |    56    |    3    |       1        |
+----------+----------+---------+----------------+
|  qwe34   |    35    |    4    |       1        |
+----------+----------+---------+----------------+

I tried the following:

JavaRDD<Feedback> feedbackRDD = spark.read().jdbc(MYSQL_CONNECTION_URL, feedbackQuery, connectionProperties)
            .javaRDD().map(Feedback.mapFunc);
JavaPairRDD<String, Long> mappedPairRDD = feedbackRDD.map(new Function<Feedback, String>() {
    public String call(Feedback p) throws Exception {
        return p.getUserId();
    }).distinct().zipWithUniqueId();
Dataset<Row> feedbackDS = spark.createDataFrame(feedbackRDD, Feedback.class);
Dataset<String> stringIds = spark.createDataset(zipped.keys().collect(), Encoders.STRING());
Dataset<Long> valueIds = spark.createDataset(zipped.values().collect(), Encoders.LONG());       
Dataset<Row> longIds = valueIds.withColumnRenamed("value", "userIdMapped");
Dataset<Row> userIdMap = intIds.join(stringIds);    
Dataset<Row> feedbackDSUserMapped = feedbackDS.join(userIdMap, feedbackDS.col("userId").equalTo(userIdMap.col("value")),
            "inner");
//Here 'value' column contains string user ids

The userIdMap dataset is joined incorrectly as below:

+-----------------+----------------+
|   userIdMapped  |     value      |
+-----------------+----------------+
|         0       |     abc13      |   
+-----------------+----------------+
|         0       |     qwe34      |   
+-----------------+----------------+
|         1       |     abc13      |   
+-----------------+----------------+
|         1       |     qwe34      |   
+-----------------+----------------+

Therefore the resulting feedbackDSUserMapped is wrong.

I'm new to Spark and I'm sure there must be a better way of doing this.

What is the best way to get the long value from pairRDD and set to relevant userId in the initial dataset(RDD)?

Any help would be much appreciated.

The data is to be used for ALS model.

Upvotes: 1

Views: 531

Answers (2)

Fleur
Fleur

Reputation: 686

Solved it using StringIndexer:

StringIndexer indexer = new StringIndexer()
              .setInputCol("userId")
              .setOutputCol("userIdMapped");
Dataset<Row> userJoinedDataSet = indexer.fit(feedbackDS).transform(feedbackDS);

Upvotes: 0

Ramachandran.A.G
Ramachandran.A.G

Reputation: 4948

You can try the following. Assign a unique id using a built in function and the join with the original dataset

/**
 * Created by RGOVIND on 11/16/2016.
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;

import java.util.ArrayList;
import java.util.List;

public class SparkUserObjectMain {
    static public void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Stack Overflow App");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        List<UserObject> users = new ArrayList<UserObject>();

        //seed the data
        UserObject user1 = new UserObject("abc13", "23", "1");
        UserObject user2 = new UserObject("qwe34", "56", "3");
        UserObject user3 = new UserObject("qwe34", "35", "4");
        users.add(user1);
        users.add(user2);
        users.add(user3);

        //how to encode the object ?
        Encoder<UserObject> userObjectEncoder = Encoders.bean(UserObject.class);
        //Create the user dataset
        Dataset<UserObject> usersDataSet = sqlContext.createDataset(users, userObjectEncoder);
        //assign unique id's
        Dataset<Row> uniqueUsersWithId = usersDataSet.dropDuplicates("userId").select("userId").withColumn("id", functions.monotonically_increasing_id());
        //join with original
        Dataset<Row> joinedDataSet = usersDataSet.join(uniqueUsersWithId, "userId");
        joinedDataSet.show();

    }
}

The bean :

/**
 * Created by RGOVIND on 11/16/2016.
 */
public class UserObject {

    private String userId;
    private String itemId;
    private String rating;

public UserObject(){

    }

    public UserObject(String userId, String itemId, String rating) {
        this.userId = userId;
        this.itemId = itemId;
        this.rating = rating;
    }

    public String getUserId() {

        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }

    public String getRating() {
        return rating;
    }

    public void setRating(String rating) {
        this.rating = rating;
    }

}

Prints :

+------+------+------+------------+
|userId|itemId|rating|          id|
+------+------+------+------------+
| abc13|    23|     1|403726925824|
| qwe34|    56|     3|901943132160|
| qwe34|    35|     4|901943132160|
+------+------+------+------------+

Upvotes: 1

Related Questions