J.Doe
J.Doe

Reputation: 183

Converting csv RDD to map

I have a large CSV( > 500 MB), which I take into a spark RDD, and I want to store it to a large Map[String, Array[Long]]. The CSV has multiple columns but I require only two for the time being. The first and second column, and is of the form:

A 12312 [some_value] ....
B 123123[some_value] ....
A 1222 [some_value] ....
C 1231 [some_value] ....

I want my map to basically group by the string and store an array of long so, for the above case, my map would be: {"A": [12312, 1222], "B": 123123, "C":1231}

But since this map would be huge, I can't simply do this directly. tsca

I take the CSV in a sql.dataframe

My code so far(Looks incorrect though):

def getMap(df: sql.DataFrame, sc: SparkContext): RDD[Map[String, Array[Long]]] = {
    var records = sc.emptyRDD[Map[String, Array[Long]]]
    val rows: RDD[Row] =  df.rdd
    rows.foreachPartition( iter => {
      iter.foreach(x =>
        if(records.contains(x.get(0).toString)){
        val arr = temp_map.getOrElse()
          records = records + (x.get(0).toString -> (temp_map.getOrElse(x.get(0).toString) :+ x.get(1).toString.toLong))
      }
        else{
          val arr = new Array[Long](1)
          arr(0) = x.get(1).toString.toLong
          records = records + (x.get(0).toString -> arr)
        }



      )
    })

  }

Thanks in advance!

Upvotes: 0

Views: 205

Answers (1)

koiralo
koiralo

Reputation: 23099

If I understood your question correctly then

You could groupBy first column and collect_list for the second column

import org.apache.spark.sql.functions._
val newDF = df.groupBy("column1").agg(collect_list("column2"))
newDF.show(faslse)

val rdd = newDF.rdd.map(r => (r.getString(0), r.getAs[List[Long]](1)))

This will give you RDD[(String, List[Long])] where the string will be unique

Upvotes: 1

Related Questions