Reputation: 43
I created a spark Dataset[Row], and the Row is Row(x: Vector). x here is a 1xp vector.
Is it possible to 1) group every k rows 2) concatenating these rows into a k x p matrix - mX i.e., change Dateset[Row(Vector)] to Dateset[Row(Matrix)] ?
Here is my current soluttion, convert this Dataset[Row] to RDD, and concatenate every k rows with zipWithIndex and aggregateByKey.
val dataRDD = data_df.rdd.zipWithIndex
.map { case (line, index) => (index/k, line) }
.aggregateByKey(...) (..., ...)
But it seems it's not very efficient, is there a more efficient way to do this?
Thanks in advance.
Upvotes: 1
Views: 2029
Reputation: 18043
Here is a solution that groups N records into columns:
Generate from RDD to DF and process as shown below.
The g is group, the k is key to record number which repeats within g. v is your record content.
Input is a file of 6 lines and I used groups of 3 here.
Only drawback is if the lines have a remainder less than the grouping N.
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._
val dfsFilename = "/FileStore/tables/7dxa9btd1477497663691/Text_File_01-880f5.txt"
val readFileRDD = spark.sparkContext.textFile(dfsFilename)
val rdd2 = readFileRDD.sliding(3,3).zipWithIndex
val rdd3 = rdd2.map(r => (r._1.zipWithIndex, r._2))
val df = rdd3.toDF("vk","g")
val df2 = df.withColumn("vke", explode($"vk")).drop("vk")
val df3 = df2.withColumn("k", $"vke._2").withColumn("v", $"vke._1").drop("vke")
val result = df3
.groupBy("g")
.pivot("k")
.agg(expr("first(v)"))
result.show()
returns:
+---+--------------------+--------------------+--------------------+
| g| 0| 1| 2|
+---+--------------------+--------------------+--------------------+
| 0|The quick brown f...|Here he lays I te...|Gone are the days...|
| 1| Gosh, what to say.|Hallo, hallo, how...| I am fine.|
+---+--------------------+--------------------+--------------------+
Upvotes: 0
Reputation: 13538
There are two performance issues with your approach:
k
If you absolutely need a global ordering, starting from line 1, and you cannot break up your data into multiple partitions then Spark has to move all the data through a single core. You can speed that part up by finding a way to have more than one partition.
You can avoid a shuffle by processing the data one partition at a time using mapPartitions
:
spark.range(1, 20).coalesce(1).mapPartitions(_.grouped(5)).show
+--------------------+
| value|
+--------------------+
| [1, 2, 3, 4, 5]|
| [6, 7, 8, 9, 10]|
|[11, 12, 13, 14, 15]|
| [16, 17, 18, 19]|
+--------------------+
Note that coalesce(1)
above is forcing all 20 rows into a single partition.
Upvotes: 1