Reputation: 302
I have data in an input text file. It contains the input data in format: "PriceId, DateTime, PriceForUSA, PriceForUK, PriceForAUS".
It looks like this:
0000002,11-05-08-2016,0.92,1.68,0.81
0000003,07-05-08-2016,0.80,1.05,1.49
0000008,07-05-08-2016,1.43,1.29,1.22
The list of countries is fixed (USA, UK, AUS), and the order of prices in lines is fixed too (PriceForUSA, PriceForUK, PriceForAUS).
I read this data from file using Spark Context and transform it to RDD[List[String[]. Every List in my RDD represents one line from the input text file.
For example,
first List contains Strings
"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"
second list contains Strings
"0000003", "07-05-08-2016" , "0.80", "1.05" , "1.49"
etc.
I also have the custom class PriceInfo
case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {
override def toString: String = s"$priceId,$priceDate,$country,$price"
}
It is not difficult to transform every List[String] to this class' object, ( I can do it already), but in this case my task is to get several custom objects from every single List[String].
For example, List which contains
"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"
should be transformed to:
And every List[String] in my RDD[List[String]] must be "splitted" to several PriceInfo objects in the same way.
The result should be a RDD[PriceInfo].
The only solution that came to my mind is to iterate RDD[List[String]] with foreach() function, create 3 PriceInfo objects in every iteration, then add all created objects in List[PriceObjects] and use this result-List in SparkContext.parallelize(List...).
Something like this:
rawPricesList.foreach(list => {
//...create PriceInfo1 from list
//...create PriceInfo2 from list
//...create PriceInfo3 from list
//...add them all to result List<PriceInfo>
})
//...sc.parallelize(List<PriceInfo>...)
But such a solution has many shortcomings.
The main thing is that it will not work if we have no link to SparkContext. For example, if we will have a method getPrices() which will have only 1 parameter - RDD[List[String]].
def getPrices(rawPricesList: RDD[List[String]]): RDD[PriceInfo] = {
rawPricesList.foreach(list => {
//...create PriceInfo1 from list
//...create PriceInfo2 from list
//...create PriceInfo3 from list
//...add them all to result List<PriceInfo>
})
//...but we can't sc.parallelize(List...) here, because there is no SparkContext sc in method parameters
}
In addition, it seems to me that Scala contains a more elegant solution.
I tried to find similar samples in books "Scala for impatient" and "Learning Spark: Lightning-Fast Big Data Analysis", but unfortunately did not find anything like this case. I will be very grateful for the help and tips.
Upvotes: 2
Views: 1233
Reputation: 22449
Here's one approach:
zip
PriceInfo
objects using flatMap
Example code below:
case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {
override def toString: String = s"$priceId,$priceDate,$country,$price"
}
val countryList = List("USA", "UK", "AUS")
val rdd = sc.textFile("/path/to/textfile").
map( _.split(",") ).
map{ case Array(id, date, p1, p2, p3) =>
(id, date, countryList.zip(List(p1.toDouble, p2.toDouble, p3.toDouble)))
}.
flatMap{ case (id, date, countryPrices) =>
countryPrices.map( cp => PriceInfo(id, date, cp._1, cp._2) )
}
// rdd: org.apache.spark.rdd.RDD[PriceInfo] = ...
rdd.collect
// res1: Array[PriceInfo] = Array(
// 0000002,11-05-08-2016,USA,0.92,
// 0000002,11-05-08-2016,UK,1.68,
// 0000002,11-05-08-2016,AUS,0.81,
// 0000003,07-05-08-2016,USA,0.8,
// 0000003,07-05-08-2016,UK,1.05,
// 0000003,07-05-08-2016,AUS,1.49,
// 0000008,07-05-08-2016,USA,1.43,
// 0000008,07-05-08-2016,UK,1.29,
// 0000008,07-05-08-2016,AUS,1.22
// )
Upvotes: 2