Reputation: 2342
I have built a Spark RDD where each element of this RDD is a JAXB Root Element representing an XML Record.
I want to split this RDD so as to produce 6 RDDs from this set. Essentially this job simply converts the hierarchical XML structure into 6 sets of flat CSV records. I am currently passing over the same RDD 6 six times to do this.
xmlRdd.cache()
val rddofTypeA = xmlRdd.map {iterate over XML Object and create Type A}
rddOfTypeA.saveAsTextFile("s3://...")
val rddofTypeB = xmlRdd.map { iterate over XML Object and create Type B}
rddOfTypeB.saveAsTextFile("s3://...")
val rddofTypeC = xmlRdd.map { iterate over XML Object and create Type C}
rddOfTypeC.saveAsTextFile("s3://...")
val rddofTypeD = xmlRdd.map { iterate over XML Object and create Type D}
rddOfTypeD.saveAsTextFile("s3://...")
val rddofTypeE = xmlRdd.map { iterate over XML Object and create Type E}
rddOfTypeE.saveAsTextFile("s3://...")
val rddofTypeF = xmlRdd.map { iterate over XML Object and create Type F}
rddOfTypeF.saveAsTextFile("s3://...")
My input dataset are 35 Million Records split into 186 files of 448MB each stored in Amazon S3. My output directory is also on S3. I am using EMR Spark.
With a six node m4.4xlarge cluster it taking 38 minutes to finish this splitting and writing the output.
Is there an efficient way to achieve this without walking over the RDD six times?
Upvotes: 2
Views: 173
Reputation: 1
Depending on your requirements regarding output paths you can solve it using simple partitionByClause
with standard DataFrameWriter
.
Instead of multiple maps design a function which takes element of xmlRdd
and returns a Seq
of Tuples
. General structure would be like this:
def extractTypes(value: T): Seq[(String, String)] = {
val a: String = extractA(value)
val b: String = extractB(value)
...
val f: String = extractF(value)
Seq(("A", a), ("B", b), ..., ("F", f))
}
flatMap
, convert to Dataset
and write:
xmlRdd.flatMap(extractTypes _).toDF("id", "value").write
.partitionBy("id")
.option("escapeQuotes", "false")
.csv(...)
Upvotes: 0
Reputation: 74679
The easiest solution (from a Spark developer's perspective) is to do the map
and saveAsTextFile
per RDD on a separate thread.
What's not widely known (and hence exploited) is the fact that SparkContext
is thread-safe and so can be used to submit jobs from separate threads.
With that said, you could do the following (using the simplest Scala solution with Future
but not necessarily the best as Future
starts a computation at instantiation time not when you say so):
xmlRdd.cache()
import scala.concurrent.ExecutionContext.Implicits.global
val f1 = Future {
val rddofTypeA = xmlRdd.map { map xml to csv}
rddOfTypeA.saveAsTextFile("s3://...")
}
val f2 = Future {
val rddofTypeB = xmlRdd.map { map xml to csv}
rddOfTypeB.saveAsTextFile("s3://...")
}
...
Future.sequence(Seq(f1,f2)).onComplete { ... }
That could cut the time for doing the mapping and saving, but would not cut the number of scans over the dataset. That should not be a big deal anyway since the dataset is cached and hence in memory and/or disk (the default persistence level is MEMORY_AND_DISK
in Spark SQL's Dataset.cache
).
Upvotes: 5