Gismo Ranas
Gismo Ranas

Reputation: 6432

Spark RDD to CSV - Add empty columns

I have a RDD[Map[String,Int]] where the keys of the maps are the column names. Each map is incomplete and to know the column names I would need to union all the keys. Is there a way to avoid this collect operation to know all the keys and use just once rdd.saveAsTextFile(..) to get the csv?

For example, say I have an RDD with two elements (scala notation):

Map("a"->1, "b"->2)
Map("b"->1, "c"->3)

I would like to end up with this csv:

a,b,c
1,2,0
0,1,3

Scala solutions are better but any other Spark-compatible language would do.

EDIT:

I can try to solve my problem from another direction also. Let's say I somehow know all the columns in the beginning, but I want to get rid of columns that have 0 value in all maps. So the problem becomes, I know that the keys are ("a", "b", "c") and from this:

Map("a"->1, "b"->2, "c"->0)
Map("a"->3, "b"->1, "c"->0)

I need to write the csv:

a,b
1,2
3,1

Would it be possible to do this with only one collect?

Upvotes: 0

Views: 997

Answers (2)

zero323
zero323

Reputation: 330073

Scala and any other supported language

You can use spark-csv

First lets find all present columns:

val cols = sc.broadcast(rdd.flatMap(_.keys).distinct().collect())

Create RDD[Row]:

val rows = rdd.map {
    row => { Row.fromSeq(cols.value.map { row.getOrElse(_, 0) })}
}

Prepare schema:

import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val schema = StructType(
    cols.value.map(field => StructField(field, IntegerType, true)))

Convert RDD[Row] to Data Frame:

val df = sqlContext.createDataFrame(rows, schema)

Write results:

// Spark 1.4+, for other versions see spark-csv docs
df.write.format("com.databricks.spark.csv").save("mycsv.csv")

You can do pretty much the same thing using other supported languages.

Python

If you use Python and final data fits in a driver memory you can use Pandas through toPandas() method:

rdd = sc.parallelize([{'a': 1, 'b': 2}, {'b': 1, 'c': 3}])
cols = sc.broadcast(rdd.flatMap(lambda row: row.keys()).distinct().collect())

df = sqlContext.createDataFrame(
    rdd.map(lambda row: {k: row.get(k, 0) for k in cols.value}))

df.toPandas().save('mycsv.csv')

or directly:

import pandas as pd 
pd.DataFrame(rdd.collect()).fillna(0).save('mycsv.csv')

Edit

One possible way to the second collect is to use accumulators to either build a set of all column names or to count these where you found zeros and use this information to map over rows and remove unnecessary columns or to add zeros.

It is possible but inefficient and feels like cheating. The only situation when it makes some sense is when number of zeros is very low, but I guess it is not the case here.

object ColsSetParam extends AccumulatorParam[Set[String]] {

  def zero(initialValue: Set[String]): Set[String] = {
    Set.empty[String]
  }

  def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = {
    s1 ++ s2
  }
}

val colSetAccum = sc.accumulator(Set.empty[String])(ColsSetParam)
rdd.foreach { colSetAccum += _.keys.toSet } 

or

// We assume you know this upfront
val allColnames = sc.broadcast(Set("a", "b", "c"))

object ZeroColsParam extends AccumulatorParam[Map[String, Int]] {

  def zero(initialValue: Map[String, Int]): Map[String, Int] = {
    Map.empty[String, Int]
  }

  def addInPlace(m1: Map[String, Int], m2: Map[String, Int]): Map[String, Int] = {
    val keys = m1.keys ++ m2.keys
    keys.map(
      (k: String) => (k -> (m1.getOrElse(k, 0) + m2.getOrElse(k, 0)))).toMap
  }
}

val accum = sc.accumulator(Map.empty[String, Int])(ZeroColsParam)

rdd.foreach { row =>
  // If allColnames.value -- row.keys.toSet is empty we can avoid this part
  accum += (allColnames.value -- row.keys.toSet).map(x => (x -> 1)).toMap
}

Upvotes: 1

Iulian Dragos
Iulian Dragos

Reputation: 5712

If you're statement is: "every new element in my RDD may add a new column name I have not seen so far", the answer is obviously can't avoid a full scan. But you don't need to collect all elements on the driver.

You could use aggregate to only collect column names. This method takes two functions, one is to insert a single element into the resulting collection, and another one to merge results from two different partitions.

rdd.aggregate(Set.empty[String])( {(s, m) => s union m.keySet }, { (s1, s2) => s1 union s2 })

You will get back a set of all column names in the RDD. In a second scan you can print the CSV file.

Upvotes: 2

Related Questions