Reputation: 53
I am creating a spark Dataset by reading a csv file. Further, I need to transform this Dataset[Row] to RDD[Array[String]] for passing it to the FpGrowth(Spark MLLIB).
val df: DataFrame = spark.read.format("csv").option("header", "true").load("/path/to/csv")
val ds: Dataset[Row] = df.groupBy("user").agg(collect_set("values"))
Now, I need to select the column "values" and transform the resultant dataset to RDD[Array[String]].
val rddS: RDD[String] = ds.select(concat_ws(",", col("values")).as("items")).distinct().rdd.map(_.mkString(","))
val rddArray: RDD[Array[String]] = rddS.map(s => s.trim.split(','))
I tried out this approach but not sure if its the best way. Please suggest me a optimal way of achieving this.
Upvotes: 3
Views: 1020
Reputation: 53
I ended up using toSeq approach
val rddArray: RDD[Array[String]] = ds.select("values").rdd.map(r => r.getSeq[String](0).toArray)
This was more efficient (faster) for my usecase.
Upvotes: 1
Reputation: 23119
Why not simply use as below, You will reduce the concat_ws
and split
operation.
val rddS:RDD[Array[String]] = ds.select("values")
.distinct()
.rdd.map(r => r.getAs[mutable.WrappedArray[String]](0).toArray)
Upvotes: 0
Reputation: 42422
One-liner:
val rddArray: RDD[Array[String]] = ds.select("values").as[Array[String]].rdd
By the way I'd suggest using dataframe-based Spark ML instead of RDD-based Spark MLLib which is now deprecated. You can use org.apache.spark.ml.fpm.FPGrowth
.
Upvotes: 5