Reputation: 686
I have a data set with columns userId
(String), itemId
(int) and rating
(int).
+----------+----------+---------+
| userId | itemId | rating |
+----------+----------+---------+
| abc13 | 23 | 1 |
+----------+----------+---------+
| qwe34 | 56 | 3 |
+----------+----------+---------+
| qwe34 | 35 | 4 |
+----------+----------+---------+
I want to map the string userIds to unique long values. I tried to map the userIds using zipWithUniqueId()
and it gives a pairRDD
.
+------------+----------------+
| userId | userIdMapped |
+------------+----------------+
| abc13 | 0 |
+------------+----------------+
| qwe34 | 1 |
+------------+----------------+
I want to add the long values to another column and create the dataset as below:
+----------+----------+---------+----------------+
| userId | itemId | rating | userIdMapped |
+----------+----------+---------+----------------+
| abc13 | 23 | 1 | 0 |
+----------+----------+---------+----------------+
| qwe34 | 56 | 3 | 1 |
+----------+----------+---------+----------------+
| qwe34 | 35 | 4 | 1 |
+----------+----------+---------+----------------+
I tried the following:
JavaRDD<Feedback> feedbackRDD = spark.read().jdbc(MYSQL_CONNECTION_URL, feedbackQuery, connectionProperties)
.javaRDD().map(Feedback.mapFunc);
JavaPairRDD<String, Long> mappedPairRDD = feedbackRDD.map(new Function<Feedback, String>() {
public String call(Feedback p) throws Exception {
return p.getUserId();
}).distinct().zipWithUniqueId();
Dataset<Row> feedbackDS = spark.createDataFrame(feedbackRDD, Feedback.class);
Dataset<String> stringIds = spark.createDataset(zipped.keys().collect(), Encoders.STRING());
Dataset<Long> valueIds = spark.createDataset(zipped.values().collect(), Encoders.LONG());
Dataset<Row> longIds = valueIds.withColumnRenamed("value", "userIdMapped");
Dataset<Row> userIdMap = intIds.join(stringIds);
Dataset<Row> feedbackDSUserMapped = feedbackDS.join(userIdMap, feedbackDS.col("userId").equalTo(userIdMap.col("value")),
"inner");
//Here 'value' column contains string user ids
The userIdMap
dataset is joined incorrectly as below:
+-----------------+----------------+
| userIdMapped | value |
+-----------------+----------------+
| 0 | abc13 |
+-----------------+----------------+
| 0 | qwe34 |
+-----------------+----------------+
| 1 | abc13 |
+-----------------+----------------+
| 1 | qwe34 |
+-----------------+----------------+
Therefore the resulting feedbackDSUserMapped
is wrong.
I'm new to Spark and I'm sure there must be a better way of doing this.
What is the best way to get the long value from pairRDD
and set to relevant userId in the initial dataset(RDD
)?
Any help would be much appreciated.
The data is to be used for ALS model.
Upvotes: 1
Views: 531
Reputation: 686
Solved it using StringIndexer
:
StringIndexer indexer = new StringIndexer()
.setInputCol("userId")
.setOutputCol("userIdMapped");
Dataset<Row> userJoinedDataSet = indexer.fit(feedbackDS).transform(feedbackDS);
Upvotes: 0
Reputation: 4948
You can try the following. Assign a unique id using a built in function and the join with the original dataset
/**
* Created by RGOVIND on 11/16/2016.
*/
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import java.util.ArrayList;
import java.util.List;
public class SparkUserObjectMain {
static public void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("Stack Overflow App");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
List<UserObject> users = new ArrayList<UserObject>();
//seed the data
UserObject user1 = new UserObject("abc13", "23", "1");
UserObject user2 = new UserObject("qwe34", "56", "3");
UserObject user3 = new UserObject("qwe34", "35", "4");
users.add(user1);
users.add(user2);
users.add(user3);
//how to encode the object ?
Encoder<UserObject> userObjectEncoder = Encoders.bean(UserObject.class);
//Create the user dataset
Dataset<UserObject> usersDataSet = sqlContext.createDataset(users, userObjectEncoder);
//assign unique id's
Dataset<Row> uniqueUsersWithId = usersDataSet.dropDuplicates("userId").select("userId").withColumn("id", functions.monotonically_increasing_id());
//join with original
Dataset<Row> joinedDataSet = usersDataSet.join(uniqueUsersWithId, "userId");
joinedDataSet.show();
}
}
The bean :
/**
* Created by RGOVIND on 11/16/2016.
*/
public class UserObject {
private String userId;
private String itemId;
private String rating;
public UserObject(){
}
public UserObject(String userId, String itemId, String rating) {
this.userId = userId;
this.itemId = itemId;
this.rating = rating;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public String getRating() {
return rating;
}
public void setRating(String rating) {
this.rating = rating;
}
}
Prints :
+------+------+------+------------+
|userId|itemId|rating| id|
+------+------+------+------------+
| abc13| 23| 1|403726925824|
| qwe34| 56| 3|901943132160|
| qwe34| 35| 4|901943132160|
+------+------+------+------------+
Upvotes: 1