Reputation: 817
I am reading in a file that has many spaces and need to filter out the space. Afterwards we need to convert it to a dataframe. Example input below.
2017123 ¦ ¦10¦running¦00000¦111¦-EXAMPLE
My solution to this was the following function which parses out all spaces and trims the file.
def truncateRDD(fileName : String): RDD[String] = {
val example = sc.textFile(fileName)
example.map(lines => lines.replaceAll("""[\t\p{Zs}]+""", ""))
}
However, I am not sure how to get it into a dataframe. sc.textFile
returns a RDD[String]
. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.
I was thinking of somehow converting RDD[String] to RDD[Row] so I can use the createDataFrame
function.
val DF = spark.createDataFrame(rowRDD, schema)
Any suggestions on how to do this?
Upvotes: 3
Views: 16243
Reputation: 29145
In your case simple way :
val RowOfRDD = truncateRDD("yourfilename").map(r => Row.fromSeq(r))
productarity
issue if you are using scala 2.10 ?However, I am not sure how to get it into a dataframe.
sc.textFile
returns a RDD[String]. I tried the case class way but the issue is we have 800 field schema, case class cannot go beyond 22.
Yes, There are some limitations like productarity
but we can overcome...
you can do like below example for < versions 2.11 :
prepare a case class which extends Product
and overrides methods.
like...
productArity():Int:
This returns the size of the attributes. In our case, it's 33. So, our implementation looks like this:
productElement(n:Int):Any:
Given an index, this returns the attribute. As protection, we also have a default case, which throws an IndexOutOfBoundsException
exception:
canEqual (that:Any):Boolean
: This is the last of the three functions, and it serves as a boundary condition when an equality check is being done against class:
Upvotes: 0
Reputation: 8519
First split/parse your strings into the fields.
rdd.map( line => parse(line))
where parse is some parsing function. It could be as simple as split but you may want something more robust. This will get you an RDD[Array[String]]
or similar.
You can then convert to an RDD[Row]
with rdd.map(a => Row.fromSeq(a))
From there you can convert to DataFrame wising sqlContext.createDataFrame(rdd, schema)
where rdd is your RDD[Row]
and schema is your schema StructType.
Upvotes: 8