Swa
Swa

Reputation: 41

Connection Refused while executing SparkStreaming program using scala

I'm trying to execute a simple wordcount SparkStreaming program in Cloudera VM. I'm using Scala in REPL mode and not using IDE.

Here is my code

val ssc = new StreamingContext(sc, Seconds(2))

val lines = ssc.socketTextStream("localhost",8585,MEMORY_ONLY)

val wordsFlatMap = lines.flatMap(_.split(" "))

val wordsMap = wordsFlatMap.map( w => (w,1))

val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))

wordCount.print

ssc.start

I get connection refused error. I'm executing the program in REPL mode. The following is the error.

    scala> ssc.start
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Starting 1 receivers
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: ReceiverTracker started
17/04/19 03:06:43 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.MappedDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@2e5fd13
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Initialized and validated org.apache.spark.streaming.dstream.FlatMappedDStream@72f13d46
17/04/19 03:06:43 INFO dstream.MappedDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.MappedDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.MappedDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@5d539aac
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@d895866
17/04/19 03:06:43 INFO dstream.ForEachDStream: Slide time = 2000 ms
17/04/19 03:06:43 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
17/04/19 03:06:43 INFO dstream.ForEachDStream: Checkpoint interval = null
17/04/19 03:06:43 INFO dstream.ForEachDStream: Remember duration = 2000 ms
17/04/19 03:06:43 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7bc99aff
17/04/19 03:06:43 INFO util.RecurringTimer: Started timer for JobGenerator at time 1492596404000
17/04/19 03:06:43 INFO scheduler.JobGenerator: Started JobGenerator at 1492596404000 ms
17/04/19 03:06:43 INFO scheduler.JobScheduler: Started JobScheduler
17/04/19 03:06:43 INFO streaming.StreamingContext: StreamingContext started

scala> 17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Receiver 0 started
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Got job 0 (submitJob at ReceiverTracker.scala:557) with 1 output partitions
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(submitJob at ReceiverTracker.scala:557)
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Missing parents: List()
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554), which has no missing parents
17/04/19 03:06:44 INFO scheduler.JobScheduler: Added jobs for time 1492596404000 ms
17/04/19 03:06:44 INFO scheduler.JobScheduler: Starting job streaming job 1492596404000 ms.0 from job set of time 1492596404000 ms
17/04/19 03:06:44 INFO spark.SparkContext: Starting job: print at <console>:47
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(65984) called with curMem=0, maxMem=560497950
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.4 KB, free 534.5 MB)
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(22354) called with curMem=65984, maxMem=560497950
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 21.8 KB, free 534.4 MB)
17/04/19 03:06:44 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41905 (size: 21.8 KB, free: 534.5 MB)
17/04/19 03:06:44 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
17/04/19 03:06:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554)
17/04/19 03:06:44 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:42)
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Got job 1 (print at <console>:47) with 1 output partitions
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(print at <console>:47)
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Missing parents: List()
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44), which has no missing parents
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(2400) called with curMem=88338, maxMem=560497950
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 534.4 MB)
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(1429) called with curMem=90738, maxMem=560497950
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1429.0 B, free 534.4 MB)
17/04/19 03:06:45 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:41905 (size: 1429.0 B, free: 534.5 MB)
17/04/19 03:06:45 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44)
17/04/19 03:06:45 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/04/19 03:06:45 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2644 bytes)
17/04/19 03:06:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
17/04/19 03:06:45 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1492596405400
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started BlockGenerator
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started block pushing thread
17/04/19 03:06:45 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 10.0.2.15:50802
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Starting receiver
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
17/04/19 03:06:45 INFO dstream.SocketReceiver: Connecting to localhost:8585
17/04/19 03:06:45 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:8585
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at java.net.Socket.<init>(Socket.java:425)
    at java.net.Socket.<init>(Socket.java:208)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error connecting to localhost:8585: java.net.ConnectException: Connection refused
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
17/04/19 03:06:45 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:8585 - java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at java.net.Socket.<init>(Socket.java:425)
    at java.net.Socket.<init>(Socket.java:208)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

I get a different error when I changed my code as shown below:

var sparkConf = new SparkConf().setAppName("Streaming Example").setMaster("local[2]").set("spark.drive.allowMultipleContexts","true")
val ssc = new StreamingContext(sparkConf,Seconds(2))

--

    17/04/19 03:18:52 INFO spark.SparkContext: Running Spark version 1.5.0-cdh5.5.0
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing view acls to: cloudera
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing modify acls to: cloudera
    17/04/19 03:18:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
    17/04/19 03:18:53 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/04/19 03:18:53 INFO Remoting: Starting remoting
    17/04/19 03:18:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:42235]
    17/04/19 03:18:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@localhost:42235]
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'sparkDriver' on port 42235.
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering MapOutputTracker
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/04/19 03:18:53 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-b87051bc-5b7f-4c4f-975f-a0661b3ec29f
    17/04/19 03:18:53 INFO storage.MemoryStore: MemoryStore started with capacity 534.5 MB
    17/04/19 03:18:53 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3c5d465-ca27-4aa0-ad43-47088abb7703/httpd-01babb12-0237-4faa-9917-394a768cbcaa
    17/04/19 03:18:53 INFO spark.HttpServer: Starting HTTP Server
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:52313
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'HTTP file server' on port 52313.
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]:4040: java.net.BindException: Address already in use
    java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:444)
        at sun.nio.ch.Net.bind(Net.java:436)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
        at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.spark-project.jetty.server.Server.doStart(Server.java:293)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904)
        at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246)
        at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:474)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $line31.$read$$iwC$$iwC.<init>(<console>:40)
        at $line31.$read$$iwC.<init>(<console>:42)
        at $line31.$read.<init>(<console>:44)
        at $line31.$read$.<init>(<console>:48)
        at $line31.$read$.<clinit>(<console>)
        at $line31.$eval$.<init>(<console>:7)
        at $line31.$eval$.<clinit>(<console>)
        at $line31.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED org.spark-project.jetty.server.Server@7bc07922: java.net.BindException: Address already in use
    java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:444)
        at sun.nio.ch.Net.bind(Net.java:436)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
        at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
        at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.spark-project.jetty.server.Server.doStart(Server.java:293)
        at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
        at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246)
        at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904)
        at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246)
        at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:474)
        at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854)
        at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $line31.$read$$iwC$$iwC.<init>(<console>:40)
        at $line31.$read$$iwC.<init>(<console>:42)
        at $line31.$read.<init>(<console>:44)
        at $line31.$read$.<init>(<console>:48)
        at $line31.$read$.<clinit>(<console>)
        at $line31.$eval$.<init>(<console>:7)
        at $line31.$eval$.<clinit>(<console>)
        at $line31.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/04/19 03:18:53 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:4041
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'SparkUI' on port 4041.
    17/04/19 03:18:53 INFO ui.SparkUI: Started SparkUI at http://localhost:4041
    17/04/19 03:18:53 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    17/04/19 03:18:53 INFO storage.BlockManagerMaster: Registered BlockManager
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017)

Can someone help me rectify the error?

Upvotes: 3

Views: 2465

Answers (1)

Nilesh Shaikh
Nilesh Shaikh

Reputation: 98

Approach 1

The error you are seeing is as expected because you have used socketTextStream(). So spark creates an instance of SocketInputDStream which uses java.net.socket

And java.net.socket is a client socket, which means it is expecting server to be running on your specified address and port number.

So you need to have some services to be running on port 8585 of your local machine.

To see what I mean, try the following (you may not need to set master or appName in your environment).

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf

object MyStream
{
  def main(args:Array[String])
  {
    val sc = new StreamingContext(new SparkConf().setMaster("local").setAppName("socketstream"),Seconds(10))
    val mystreamRDD = sc.socketTextStream("bbc.co.uk",80)
    mystreamRDD.print()
    sc.start()
    sc.awaitTermination()
  }
}

This doesn't return any content because the app doesn't speak HTTP to the bbc website but it does not get a connection refused exception.

To run a local server on linux, you can use netcat with a simple command such as

cat data.txt | ncat -l -p 8585

If the above code gives the same error then follow approach 2.

Approach 2

However, a number of things could be causing the error:

  • You are trying to connect to the wrong IP/port.
  • You have not started your server.
  • Your server is not listening for connections.
  • Your server has too many pending connections waiting to be accepted.
  • A firewall is blocking your connection before it reaches your server.

Hope this can help you.

Upvotes: 2

Related Questions