thebeancounter
thebeancounter

Reputation: 4849

pyspark localiterator listener has stopped

I am using pyspark

I use this code:

a = rdd.map(lambda x: function).toLocalIterator()
from collections import Counter
c = sum(a,Counter()) 

and get the following error

ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event

SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@1ada509b)

WARN Utils: Suppressing exception in finally: Connection reset java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$2.apply$mcV$sp(PythonRDD.scala:707) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:706) Suppressed: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.FilterOutputStream.close(FilterOutputStream.java:158) at java.io.FilterOutputStream.close(FilterOutputStream.java:159) ... 3 more

when I use this code instead, it gets me the right result with no error

c = Counter()
for i,pair in a:
    c+=Counter(pair)

I tried to play with the partitions in the rdd map.

Nothing works.

The two code sections should work in the same way, what is the difference? why would the first not work?

Thanks

Upvotes: 1

Views: 1522

Answers (1)

thebeancounter
thebeancounter

Reputation: 4849

The issue was the lack of memory on the driver, i solved it using

conf = SparkConf()
conf.set("spark.driver.memory", "3G")
sc = SparkContext(conf=conf)

before staring the spark context

Upvotes: 0

Related Questions