Reputation: 425
First, I read a text file and turn it into RDD[(String,(String,Float))]:
val data = sc.textFile(dataInputPath);
val dataRDD:RDD[(String,(String,Float))] = data.map{f=> {
val temp=f.split("//x01");
(temp(0),(temp(1),temp(2).toInt ) );
}
} ;
Then, I run following code to turn my data into Rating type
import org.apache.spark.mllib.recommendation.Rating
val imeiMap = dataRDD.reduceByKey((s1,s2)=>s1).collect().zipWithIndex.toMap;
val docidMap = dataRDD.map( f=>(f._2._1,1)).reduceByKey((s1,s2)=>s1).collect().zipWithIndex.toMap;
val ratings = dataRDD.map{case (imei, (doc_id,rating))=> Rating(imeiMap(imei),docidMap(doc_id),rating)};
But I got an error:
Error:(32, 77) type mismatch;
found : String
required: (String, (String, Float))
val ratings = dataRDD.map{case (imei, (doc_id,rating))=> Rating(imeiMap(imei),docidMap(doc_id),rating)};
Why this happen? I think that the string
have already changed to (String, (String, Float))
.
Upvotes: 2
Views: 1517
Reputation: 7928
The key of docidMap is not a String, is a Tuple (String, Int)
This is because you have the zipWithIndex before the .toMap
method:
With this rdd as input for a quick test:
(String1,( String2,32.0))
(String1,( String2,35.0))
scala> val docidMap = dataRDD.map( f=>(f._2._1,1)).reduceByKey((s1,s2)=>s1).collect().zipWithIndex.toMap;
docidMap: scala.collection.immutable.Map[(String, Int),Int] = Map((" String2",1) -> 0)
val docidMap = dataRDD.map( f=>(f._2._1,1)).reduceByKey((s1,s2)=>s1).collect().toMap;
docidMap: scala.collection.immutable.Map[String,Int] = Map(" String2" -> 1)
The same will happen with your imeiMap
, it seems that you just need to remove the zipWithIndex
from there too
val imeiMap = dataRDD.reduceByKey((s1,s2)=>s1).collect.toMap
Upvotes: 2
Reputation: 1085
it is not about your dataRDD
, it is about imeiMap
:
imeiMap: scala.collection.immutable.Map[(String, (String, Float)),Int]
Upvotes: 2