Reputation: 53
I have such data in a file and I'd like to do some statistics using Spark.
File content:
aaa|bbb|ccc
ddd|eee|fff|ggg
I need to assign each line an id. I read them as rdd and use zipWithIndex()
.
Then they should be like:
(0, aaa|bbb|ccc)
(1, ddd|eee|fff|ggg)
I need to make each string associated with the id. I can get the RDD of Array(Row), but can't jump out of the array.
How should I modify my code?
import org.apache.spark.sql.{Row, SparkSession}
val fileRDD = spark.sparkContext.textFile(filePath)
val fileWithIdRDD = fileRDD.zipWithIndex()
// make the line like this: (0, aaa), (0, bbb), (0, ccc)
// each line is a record of Array(Row)
fileWithIdRDD.map(x => {
val id = x._1
val str = x._2
val strArr = str.split("\\|")
val rowArr = strArr.map(y => {
Row(id, y)
})
rowArr
})
Now it looks like:
[(0, aaa), (0, bbb), (0, ccc)]
[(1, ddd), (1, eee), (1, fff), (1, ggg)]
But finally I want:
(0, aaa)
(0, bbb)
(0, ccc)
(1, ddd)
(1, eee)
(1, fff)
(1, ggg)
Upvotes: 2
Views: 2018
Reputation: 7928
You just need to flatten your RDD
yourRDD.flatMap(array => array)
Considering your code (some errors fixed, inside the inner map and in the assignation of id and str)
fileWithIdRDD.map(x => {
val id = x._1
val str = x._2
val strArr = str.split("\\|")
val rowArr = strArr.map(y => {
Row(id, y)
})
rowArr
}).flatMap(array => array)
Quick example here:
INPUT
fileWithIdRDD.collect
res30: Array[(Int, String)] = Array((0,aaa|bbb|ccc), (1,ddd|eee|fff|ggg))
EXECUTION
scala> fileWithIdRDD.map(x => {
val id = x._1
val str = x._2
val strArr = str.split("\\|")
val rowArr = strArr.map(y => {
Row(id, y)
})
rowArr
}).flatMap(array => array)
res31: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at flatMap at <console>:35
OUTPUT
scala> res31.collect
res32: Array[org.apache.spark.sql.Row] = Array([0,aaa], [0,bbb], [0,ccc], [1,ddd], [1,eee], [1,fff], [1,ggg])
Upvotes: 2