Ellen
Ellen

Reputation: 115

Cannot connect to local spark cluster from Java application

I'm trying to run a java application that connects to a local standalone spark cluster. I start the cluster with the default configuration, using start-all.sh. When I go to the web page for the cluster, it is started ok. I can connect to this cluster with SparkR, but when I use the same master URL to connect from within Java, I get an error message.

I'm using Spark 1.5.

Here is my Java code:

SparkConf conf = new SparkConf();
conf.setAppName("test");
conf.setMaster("spark://Ellens-MacBook-Pro.local:7077");
conf.setSparkHome("/Applications/spark-1.5.0-bin-hadoop2.6");
SparkContext sc = new SparkContext(conf);

Here is a snippet of the error message:

ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:7077] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 15/10/09 17:31:41 INFO AppClient$ClientEndpoint: Connecting to master spark://Ellens-MacBook-Pro.local:7077... 15/10/09 17:31:41 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:7077] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 15/10/09 17:32:01 INFO AppClient$ClientEndpoint: Connecting to master spark://Ellens-MacBook-Pro.local:7077... 15/10/09 17:32:01 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main] java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@54e2b678 rejected from java.util.concurrent.ThreadPoolExecutor@5d9f3e0d[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 2] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)

    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)

Here is the output in the Spark log:

Spark Command: /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -cp /Applications/spark-1.5.0-bin-hadoop2.6/sbin/../conf/:/Applications/spark-1.5.0-bin-hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar:/Applications/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/Applications/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/Applications/spark-1.5.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip Ellens-MacBook-Pro.local --port 7077 --webui-port 8080
========================================
15/10/12 17:56:43 INFO Master: Registered signal handlers for [TERM, HUP, INT]
15/10/12 17:56:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/10/12 17:56:43 INFO SecurityManager: Changing view acls to: ellenk
15/10/12 17:56:43 INFO SecurityManager: Changing modify acls to: ellenk
15/10/12 17:56:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ellenk); users with modify permissions: Set(ellenk)
15/10/12 17:56:44 INFO Slf4jLogger: Slf4jLogger started
15/10/12 17:56:44 INFO Remoting: Starting remoting
15/10/12 17:56:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:7077]
15/10/12 17:56:44 INFO Utils: Successfully started service 'sparkMaster' on port 7077.
15/10/12 17:56:44 INFO Master: Starting Spark master at spark://Ellens-MacBook-Pro.local:7077
15/10/12 17:56:44 INFO Master: Running Spark version 1.5.0
15/10/12 17:56:44 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/10/12 17:56:44 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.3:8080
15/10/12 17:56:44 INFO Utils: Successfully started service on port 6066.
15/10/12 17:56:44 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
15/10/12 17:56:45 INFO Master: I have been elected leader! New state: ALIVE
15/10/12 17:56:50 INFO Master: Registering worker 192.168.1.3:57180 with 8 cores, 15.0 GB RAM
15/10/12 17:57:23 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:57238] has failed, address is now gated for [5000] ms. Reason: [null] 
15/10/12 17:57:23 INFO Master: 192.168.1.3:57238 got disassociated, removing it.
15/10/12 17:57:43 ERROR Remoting: 
java.io.OptionalDataException
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1371)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
    at scala.util.Try$.apply(Try.scala:161)
    at akka.serialization.Serialization.deserialize(Serialization.scala:98)
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:935)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/10/12 17:57:43 INFO Master: 192.168.1.3:57238 got disassociated, removing it.
15/10/12 17:58:03 INFO Master: 192.168.1.3:57238 got disassociated, removing it.
15/10/12 17:58:03 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:57238] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
15/10/12 17:58:03 INFO Master: 192.168.1.3:57238 got disassociated, removing it.
15/10/12 17:58:03 INFO Master: 192.168.1.3:57238 got disassociated, removing it.
15/10/12 17:58:03 INFO Master: 192.168.1.3:57238 got disassociated, removing it.

Upvotes: 2

Views: 3644

Answers (3)

JAg
JAg

Reputation: 1

I am not sure if this would help; I was getting error of not able to connect to spark://master:7077; whereas when I started the master/slave it showed me the spark master at spark://127.0.1.1:7077.

So I mapped "master" to 127.0.1.1 in /etc/hosts, and voila it worked like charm; it could find sparkContext instance easily.

Jag

Upvotes: 0

RichMeister
RichMeister

Reputation: 124

Spark can be connected to locally using the JavaSparkContext. For instance, here is one of the constructors:

JavaSparkContext(String master, String appName, String sparkHome, String[] jars) 

JavaSparkContext was introduced as of 1.4:

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/api/java/JavaSparkContext.html

Here is an example similar to what I have working. I'm using Maven. so the application jar is created under the target directory:

public JavaSparkContext getJavaSparkContext(String myAppName ) {
    // Based on example above:
    String sparkHome="/Applications/spark-1.5.0-bin-hadoop2.6"; 
    JavaSparkContext sc = new JavaSparkContext("local", myAppName,
            sparkHome, new String[]{"target/sparkapp-0.0.1-SNAPSHOT.jar"});
    return sc;

}

The argument myAppName is just a string, for example: "My App"

Upvotes: 0

kostya
kostya

Reputation: 9559

Most likely this is due to Spark version mismatch. Make sure that you are using the same versions (including Scala version that should be 2.10 if you are using one of the provided binary builds) of spark libraries both on the client and server.

Also if you are using akka in your client application you should make sure that it is compatible with the one used by Spark (2.3.4+).

Upvotes: 2

Related Questions