rohan
rohan

Reputation: 53

Best approach to transform Dataset[Row] to RDD[Array[String]] in Spark-Scala?

I am creating a spark Dataset by reading a csv file. Further, I need to transform this Dataset[Row] to RDD[Array[String]] for passing it to the FpGrowth(Spark MLLIB).

val df: DataFrame = spark.read.format("csv").option("header", "true").load("/path/to/csv")
val ds: Dataset[Row] = df.groupBy("user").agg(collect_set("values"))

Now, I need to select the column "values" and transform the resultant dataset to RDD[Array[String]].

val rddS: RDD[String] = ds.select(concat_ws(",", col("values")).as("items")).distinct().rdd.map(_.mkString(","))
val rddArray: RDD[Array[String]] = rddS.map(s => s.trim.split(','))

I tried out this approach but not sure if its the best way. Please suggest me a optimal way of achieving this.

Upvotes: 3

Views: 1020

Answers (3)

rohan
rohan

Reputation: 53

I ended up using toSeq approach

val rddArray: RDD[Array[String]] = ds.select("values").rdd.map(r => r.getSeq[String](0).toArray)

This was more efficient (faster) for my usecase.

Upvotes: 1

koiralo
koiralo

Reputation: 23119

Why not simply use as below, You will reduce the concat_ws and split operation.

val rddS:RDD[Array[String]] = ds.select("values")
    .distinct()
    .rdd.map(r => r.getAs[mutable.WrappedArray[String]](0).toArray)

Upvotes: 0

mck
mck

Reputation: 42422

One-liner:

val rddArray: RDD[Array[String]] = ds.select("values").as[Array[String]].rdd

By the way I'd suggest using dataframe-based Spark ML instead of RDD-based Spark MLLib which is now deprecated. You can use org.apache.spark.ml.fpm.FPGrowth.

Upvotes: 5

Related Questions