user1124702
user1124702

Reputation: 1135

Converting Scala mutable arrays to a spark dataframe

I have three mutable arrays defined as:

import scala.collection.mutable.ArrayBuffer
var quartile_1 = ArrayBuffer[Double]()
var quartile_3 = ArrayBuffer[Double]()
var id = ArrayBuffer[String]()

quartile_1 and quartile_3 are information at id level and I am currently computing them as:

def func1(x: org.apache.spark.sql.Row) {
  val apQuantile = df_auth_for_qnt.where($"id" === x(0).toString).stat.approxQuantile("tran_amt", Array(0.25, 0.75), 0.001)
  quartile_1 += apQuantile(0)
  quartile_3 += apQuantile(1)
  id += x(0).toString()
}

val cardNumList = df_auth_for_qnt_gb.where($"tran_cnt" > 8).select("card_num_1").collect.foreach(func1)

Is there a better approach than appending them to mutable arrays? My goal is to have the quantile data, id available as a dataframe - so that I can do further joins.

Upvotes: 1

Views: 2927

Answers (1)

Antot
Antot

Reputation: 3964

Mutable structures like ArrayBuffer are evil, especially in parallelizable context. Here they can be avoided quite easily.

func1 can return a tuple of (String, Array[Double]), where the first element corresponds to the id (former id buffer) and the second element is the quartiles returned from approxQuantile:

def func1(x: Row): (String, Array[Double]) = {
  val cardNum1 = x(0).toString
  val quartiles = df_auth_for_qnt.where($"id" === cardNum1).stat.approxQuantile("tran_amt", Array(0.25, 0.75), 0.001)
  (cardNum1, quartiles)
}

Now, using functional chaning we can obtain an immutable result structure.

As a DataFrame:

val resultDf = df_auth_for_qnt_gb.where($"tran_cnt" > 8).select("card_num_1").map(func1).toDF("id", "quartiles")

Or as a Map[String, Array[Double]] with same associations as in the tuples returned from func1:

val resultMap = df_auth_for_qnt_gb.where($"tran_cnt" > 8).select("card_num_1").map(func1).collect().toMap

Upvotes: 1

Related Questions