mingzhao.pro
mingzhao.pro

Reputation: 759

Spark SQL Join multiple failed

I have about 18 DataSet each of which contains 10 different columns and 1k~10k rows and I have to do a left join on all of them one by one.

When I do:

b = a.join(A, 'one column', 'outer_left').dictinct() 
c = b.join(B, 'one column', 'outer_left').dictinct()
d = c.join(C, 'one column', 'outer_left').dictinct()
... 
n = m.join(M, 'one column', 'outer_left').dictinct()
n.write()

it works.

but then I would like to optimize the code like this:

val data = List(a, A, B, C, ..., N, M)
val joined = data.reduce((left, right) => left.join(right, 'one column', 'outer_left').distinct()) 
val result = joined.distinct()  // this one works
result.write()  // this one doesn't work




18/02/28 09:54:35 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_45_90 !
18/02/28 09:54:35 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_108_191 !
[Stage 63:>                                                       (1 + 4) / 200]18/02/28 09:54:38 WARN TaskSetManager: Lost task 4.1 in stage 63.3 (TID 12686, 127.0.0.1, executor 8): FetchFailed(null, shuffleId=11, mapId=-1, reduceId=4, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)   at org.apache.spark.scheduler.Task.run(Task.scala:108)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  at java.lang.Thread.run(Thread.java:748) 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1329)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1729)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 44 more

Upvotes: 1

Views: 614

Answers (2)

mingzhao.pro
mingzhao.pro

Reputation: 759

my second solution which didn't work at first time becomes a working one by a small modification with broadcast.

val data = List(a, A, B, C, ..., N, M)
val joined = data.reduce((left, right) => left.join(broadcast(right), 'one column', 'outer_left').distinct()) 
val result = joined.distinct()  // this one works
result.write()  // this one work now

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

In scala, foldLeft is a very beautiful function that can be used to meet your requirement.

I have three dataframes as you have 18

a
+---+-----+
| id|value|
+---+-----+
|  0|    2|
|  1|    5|
|  2|    1|
|  3|    3|
|  4|    8|
+---+-----+

root
 |-- id: string (nullable = true)
 |-- value: string (nullable = true)


A
+---+-----+
| id| name|
+---+-----+
|  1|Cliff|
|  2|  Raj|
|  3| Alim|
|  4| Jose|
|  5| Jeff|
+---+-----+

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)


B
+---+------+
| id|number|
+---+------+
|  1|   100|
|  2|   200|
|  3|   300|
|  4|   400|
|  5|   500|
+---+------+

root
 |-- id: string (nullable = true)
 |-- number: integer (nullable = false)

Your working way

You are joining in serialized manner as following

val joined = a.join(A, Seq("id"), "leftouter").join(B, Seq("id"), "leftouter")

which should give you

+---+-----+-----+------+
|id |value|name |number|
+---+-----+-----+------+
|0  |2    |null |null  |
|1  |5    |Cliff|100   |
|2  |1    |Raj  |200   |
|3  |3    |Alim |300   |
|4  |8    |Jose |400   |
+---+-----+-----+------+

root
 |-- id: string (nullable = true)
 |-- value: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number: integer (nullable = true)

foldLeft way

Now if we do the same in foldLeft function

val data = List(A, B)
val joined = data.foldLeft(a)((left, right) => left.join(right, Seq("id"), "leftouter"))

You should get the same result. I hope the answer is helpful for you to uderstand your needs.

Upvotes: 0

Related Questions