Reputation: 759
I have about 18 DataSet each of which contains 10 different columns and 1k~10k rows and I have to do a left join on all of them one by one.
When I do:
b = a.join(A, 'one column', 'outer_left').dictinct()
c = b.join(B, 'one column', 'outer_left').dictinct()
d = c.join(C, 'one column', 'outer_left').dictinct()
...
n = m.join(M, 'one column', 'outer_left').dictinct()
n.write()
it works.
but then I would like to optimize the code like this:
val data = List(a, A, B, C, ..., N, M)
val joined = data.reduce((left, right) => left.join(right, 'one column', 'outer_left').distinct())
val result = joined.distinct() // this one works
result.write() // this one doesn't work
18/02/28 09:54:35 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_45_90 !
18/02/28 09:54:35 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_108_191 !
[Stage 63:> (1 + 4) / 200]18/02/28 09:54:38 WARN TaskSetManager: Lost task 4.1 in stage 63.3 (TID 12686, 127.0.0.1, executor 8): FetchFailed(null, shuffleId=11, mapId=-1, reduceId=4, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1729)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 44 more
Upvotes: 1
Views: 614
Reputation: 759
my second solution which didn't work at first time becomes a working one by a small modification with broadcast.
val data = List(a, A, B, C, ..., N, M)
val joined = data.reduce((left, right) => left.join(broadcast(right), 'one column', 'outer_left').distinct())
val result = joined.distinct() // this one works
result.write() // this one work now
Upvotes: 1
Reputation: 41957
In scala, foldLeft
is a very beautiful function that can be used to meet your requirement.
I have three dataframes
as you have 18
a
+---+-----+
| id|value|
+---+-----+
| 0| 2|
| 1| 5|
| 2| 1|
| 3| 3|
| 4| 8|
+---+-----+
root
|-- id: string (nullable = true)
|-- value: string (nullable = true)
A
+---+-----+
| id| name|
+---+-----+
| 1|Cliff|
| 2| Raj|
| 3| Alim|
| 4| Jose|
| 5| Jeff|
+---+-----+
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
B
+---+------+
| id|number|
+---+------+
| 1| 100|
| 2| 200|
| 3| 300|
| 4| 400|
| 5| 500|
+---+------+
root
|-- id: string (nullable = true)
|-- number: integer (nullable = false)
Your working way
You are joining in serialized manner as following
val joined = a.join(A, Seq("id"), "leftouter").join(B, Seq("id"), "leftouter")
which should give you
+---+-----+-----+------+
|id |value|name |number|
+---+-----+-----+------+
|0 |2 |null |null |
|1 |5 |Cliff|100 |
|2 |1 |Raj |200 |
|3 |3 |Alim |300 |
|4 |8 |Jose |400 |
+---+-----+-----+------+
root
|-- id: string (nullable = true)
|-- value: string (nullable = true)
|-- name: string (nullable = true)
|-- number: integer (nullable = true)
foldLeft way
Now if we do the same in foldLeft
function
val data = List(A, B)
val joined = data.foldLeft(a)((left, right) => left.join(right, Seq("id"), "leftouter"))
You should get the same result. I hope the answer is helpful for you to uderstand your needs.
Upvotes: 0