XY.W
XY.W

Reputation: 104

pyspark, looking for the maximal in a large RDD?

I worked out four methods to solve this problem, but none really worked for a large RDD, I hope someone can help me out.

I have a Spark RDD in the format of ((x,y),(sim, sim')), where x and y are two indexes, sim and sim' are two different similarity measures of x and y. And I'm interested to look for the tuple which has the maximal sim' value. I figured out a few methods to do so, but each of them has its problem, eventually none of these methods can be applied in an RDD of large size, such as an RDD of 1 billion tuples.

Let's say that res_dict is the RDD of ((x,y),(sim,sim')) tuples, when calling res_dict.collect() in pyspark, it returns [((0, 4), (0.84482865216358305, -0.15517134783641684)), ((0, 5), (0.81419456295517345, -0.18580543704482633)), ((0, 6), (0.758987946368752, -0.24101205363124789)), ((1, 6), (0.85216892617439344, -0.14783107382560645)), ((1, 7), (0.82916404722915915, -0.17083595277084063)), ((3, 8), (0.89277958989841388, -0.10722041010158612)), ((0, 3), (0.88578029179338835, -0.11421970820661154)), ((1, 5), (0.8929292047906795, -0.10707079520932028)), ((0, 7), (0.72844973106026745, -0.27155026893973233)), ((1, 8), (0.73430836128696797, -0.26569163871303192))]. In practice, the initial res_dict is much larger, it goes through (n-1) iterations and in each iteration the number of tuples in res_dict reduces as many as (n-ite). n is the total number of tuples in the initial res_dict, and ite is the index of current iteration, ite=1,....,n-1.

Method 1:

res_dict is initially partitioned by .repartition(k) into n partitions, (k>1). Method 1 looks for the pair that has maximal sim' value in each partition, then in the returned list of pairs it selects the pair with maximal sim'.

    def f(iterator): yield max(iterator, key = lambda d:d[1][1])
    max_list = res_dict.mapPartitions(f)
    i_j_sim = max_list.max(key = lambda d:d[1][1])

This method requires each partition to be non-empty, if not, error occurred error of having an empty sequence for max()

As the size of res_dict diminishes in each iteration, apparently its number of partitions need to be dynamically decided, otherwise empty partition will emerge and cause the error. So before passing the above code, I looked for the number of non-empty partitions of res_dict in current iteration, and repartition res_dict with this number:

    def numNonEmptyPar(anRDD):
        par_ind_size = anRDD.mapPartitionsWithIndex(length)
        numNonEmp = par_ind_size.filter(lambda d: d[1] != 0).map(lambda x:1).reduce(add) # reduce is quite slow
        return numNonEmp

    numNonEmpar = numNonEmptyPar(res_dict)
    if numNonEmpar < resPar:
        resPar = numNonEmpar
        res_dict = res_dict.repartition(resPar)

But the same error occurred: enter image description here

It seemed to me that .repartition() does NOT guarantee that each partition is non-empty (.coalesce() neither). So how can I make Method 1 work?

Method 2:

i_j_sim = res_dict.glom().
\map(lambda ls : None if len(ls)==0 else max(ls, key=lambda d:d[1][1])).
\filter(lambda d: d!= None).max(lambda d:d[1][1]) # pyspark built-in func: rdd.max()

This method is similar to Method 1, but it avoids to use .mapPartition(), I thought by doing so I can solve the problem of Method, but I had this error occurrederror of using Method 2

Method 3:

i_j_sim = res_dict.max(key=lambda d: d[1][1])

Error of method 3: method3 error at max

For method 2 and method 3, it looks like max(lambda d:d[1][1]) was the problem. I observed that they worked find for an res_dict of 1,0000 tuples, but didn't work for a billion tuples. So rdd.max() should only be fed with a small rdd?

PS: the full traceback of Method 3 was

ssh://[email protected]:22/usr/bin/python -u /home/hduser/Documents/test_Spark/ahc_sim_v1.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/10/14 14:47:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 5:==>(23 + 4) / 32][Stage 6:>  (0 + 12) / 32][Stage 8:>   (0 + 0) / 32]16/10/14 14:48:30 WARN TaskSetManager: Lost task 4.0 in stage 6.0 (TID 68, 159.84.139.245): java.io.StreamCorruptedException: invalid stream header: 12018301
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
at scala.Option.map(Option.scala:146)
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
[Stage 5:==>(24 + 4) / 32][Stage 6:>  (0 + 12) / 32][Stage 8:>   (0 + 0) / 32]16/10/14 14:48:31 ERROR TaskSetManager: Task 10 in stage 6.0 failed 4 times; aborting job
Traceback (most recent call last):
  File "/home/hduser/Documents/test_Spark/ahc_sim_v1.py", line 320, in <module>
    i_j_sim = res_dict.max(key=lambda d: d[1][1]) # get one pair with max s'(x,y)
  File "/home/hduser/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py", line 974, in max
    return self.reduce(lambda a, b: max(a, b, key=key))
  File "/home/hduser/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py", line 802, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home/hduser/spark-2.0.0-bin-hadoop2.7/python/pyspark/rdd.py", line 776, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python2.7/dist-packages/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 6.0 failed 4 times, most recent failure: Lost task 10.3 in stage 6.0 (TID 101, 159.84.139.247): java.io.StreamCorruptedException: invalid stream header: 12018301
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
at scala.Option.map(Option.scala:146)
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid stream header: 12018301
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:524)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:522)
at scala.Option.map(Option.scala:146)
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:522)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:609)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Process finished with exit code 1

Method 4:

    max_sim_pr = res_dict.values().values().treeReduce(lambda a,b: max(a,b))
    i_j_sim = res_dict.filter(lambda d:d[1][1] == max_sim_pr).first()

To avoid using pyspark built-in rdd.max(), I used python built-in max(). But I had an error at treeReduce(). method4 problem at treeReduce

It seemed that for method 2, 3 and 4, the real problem occurred to .reduce(), but I don't know why and don't know how to solve it.

Upvotes: 2

Views: 399

Answers (1)

XY.W
XY.W

Reputation: 104

I found out the problem. The error was due to that I didn't have the same spark-defaults.conf file on every worker. After I made the content of this conf file consistent for every node in my cluster, the methods 2,3 and 4 turned out to be working fine. But method 1 didn't work, it seemed to me that rdd.repartition() cannot ensure that a partition is non-empty even the number of elements in this rdd is larger than the number of partitions. I also noticed that .repartition() required much shuffling, so I changed to .coalesce() which ran faster for my tests.

Upvotes: 0

Related Questions