Defcon
Defcon

Reputation: 817

Convert RDD[String] to RDD[Row] to Dataframe Spark Scala

I am reading in a file that has many spaces and need to filter out the space. Afterwards we need to convert it to a dataframe. Example input below.

2017123 ¦     ¦10¦running¦00000¦111¦-EXAMPLE

My solution to this was the following function which parses out all spaces and trims the file.

def truncateRDD(fileName : String): RDD[String] = {
    val example = sc.textFile(fileName)
    example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}

However, I am not sure how to get it into a dataframe. sc.textFile returns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.

I was thinking of somehow converting RDD[String] to RDD[Row] so I can use the createDataFrame function.

val DF = spark.createDataFrame(rowRDD, schema)

Any suggestions on how to do this?

Upvotes: 3

Views: 16243

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29145

In your case simple way :

val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))

How to solve productarity issue if you are using scala 2.10 ?

However, I am not sure how to get it into a dataframe. sc.textFile returns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.

Yes, There are some limitations like productarity but we can overcome... you can do like below example for < versions 2.11 :

prepare a case class which extends Product and overrides methods.

like...

  • productArity():Int: This returns the size of the attributes. In our case, it's 33. So, our implementation looks like this:

  • productElement(n:Int):Any: Given an index, this returns the attribute. As protection, we also have a default case, which throws an IndexOutOfBoundsException exception:

  • canEqual (that:Any):Boolean: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against class:


Upvotes: 0

puhlen
puhlen

Reputation: 8519

First split/parse your strings into the fields.

rdd.map( line => parse(line)) where parse is some parsing function. It could be as simple as split but you may want something more robust. This will get you an RDD[Array[String]] or similar.

You can then convert to an RDD[Row] with rdd.map(a => Row.fromSeq(a))

From there you can convert to DataFrame wising sqlContext.createDataFrame(rdd, schema) where rdd is your RDD[Row] and schema is your schema StructType.

Upvotes: 8

Related Questions