Xiang Liu
Xiang Liu

Reputation: 43

How to efficiently group every k rows in spark dataset?

I created a spark Dataset[Row], and the Row is Row(x: Vector). x here is a 1xp vector.

Is it possible to 1) group every k rows 2) concatenating these rows into a k x p matrix - mX i.e., change Dateset[Row(Vector)] to Dateset[Row(Matrix)] ?

Here is my current soluttion, convert this Dataset[Row] to RDD, and concatenate every k rows with zipWithIndex and aggregateByKey.

val dataRDD = data_df.rdd.zipWithIndex
    .map {  case (line, index) =>  (index/k, line) }
    .aggregateByKey(...) (..., ...)

But it seems it's not very efficient, is there a more efficient way to do this?

Thanks in advance.

Upvotes: 1

Views: 2029

Answers (2)

Ged
Ged

Reputation: 18043

Here is a solution that groups N records into columns:

Generate from RDD to DF and process as shown below.

The g is group, the k is key to record number which repeats within g. v is your record content.

Input is a file of 6 lines and I used groups of 3 here.

Only drawback is if the lines have a remainder less than the grouping N.

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._

val dfsFilename = "/FileStore/tables/7dxa9btd1477497663691/Text_File_01-880f5.txt"
val readFileRDD = spark.sparkContext.textFile(dfsFilename)
val rdd2 = readFileRDD.sliding(3,3).zipWithIndex
val rdd3 = rdd2.map(r => (r._1.zipWithIndex, r._2))
val df = rdd3.toDF("vk","g")

val df2 = df.withColumn("vke", explode($"vk")).drop("vk")
val df3 = df2.withColumn("k", $"vke._2").withColumn("v", $"vke._1").drop("vke")

val result = df3
            .groupBy("g")
            .pivot("k")
            .agg(expr("first(v)"))

result.show()

returns:

+---+--------------------+--------------------+--------------------+
|  g|                   0|                   1|                   2|
+---+--------------------+--------------------+--------------------+
|  0|The quick brown f...|Here he lays I te...|Gone are the days...|
|  1|  Gosh, what to say.|Hallo, hallo, how...|          I am fine.|
+---+--------------------+--------------------+--------------------+

Upvotes: 0

Sim
Sim

Reputation: 13538

There are two performance issues with your approach:

  1. Using a global ordering
  2. Doing a shuffle to build the groups of k

If you absolutely need a global ordering, starting from line 1, and you cannot break up your data into multiple partitions then Spark has to move all the data through a single core. You can speed that part up by finding a way to have more than one partition.

You can avoid a shuffle by processing the data one partition at a time using mapPartitions:

spark.range(1, 20).coalesce(1).mapPartitions(_.grouped(5)).show

+--------------------+
|               value|
+--------------------+
|     [1, 2, 3, 4, 5]|
|    [6, 7, 8, 9, 10]|
|[11, 12, 13, 14, 15]|
|    [16, 17, 18, 19]|
+--------------------+

Note that coalesce(1) above is forcing all 20 rows into a single partition.

Upvotes: 1

Related Questions