Reputation: 4849
I am using pyspark
I use this code:
a = rdd.map(lambda x: function).toLocalIterator()
from collections import Counter
c = sum(a,Counter())
and get the following error
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event
SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@1ada509b)
WARN Utils: Suppressing exception in finally: Connection reset java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:707) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) Suppressed: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at java.io.FilterOutputStream.close(FilterOutputStream.java:159) ... 3 more
when I use this code instead, it gets me the right result with no error
c = Counter()
for i,pair in a:
c+=Counter(pair)
I tried to play with the partitions in the rdd map.
Nothing works.
The two code sections should work in the same way, what is the difference? why would the first not work?
Thanks
Upvotes: 1
Views: 1522
Reputation: 4849
The issue was the lack of memory on the driver, i solved it using
conf = SparkConf()
conf.set("spark.driver.memory", "3G")
sc = SparkContext(conf=conf)
before staring the spark context
Upvotes: 0