Pavel Krutikhin
Pavel Krutikhin

Reputation: 53

Error using spark remote cluster in java spring app

I'm trying to use Spark 2.2.1 in my Spring Boot 2.0 application. In my app I'm trying to connect to remote standalone Spark Cluster. Here is my Spark config and Spark context beans:

@Bean
public SparkConf sparkConf() {
    return new SparkConf()
            .setAppName("testSpark")
            .setMaster("spark://<spark-ip>:7077")
            .setJars(new String[]{"/path/to/my/app.jar"})
            .set("spark.cassandra.connection.host",env.getProperty(AppConfig.CONTACTPOINTS));
}

@Bean
public JavaSparkContext javaSparkContext() {
    return new JavaSparkContext(sparkConf());
}

But when I start my app, it issuing the following error:

2018-03-27 12:36:46.933  INFO 18185 --- [er-threadpool-0] s.d.c.StandaloneAppClient$ClientEndpoint : Connecting to master spark://<spark-ip>:7077...
2018-03-27 12:36:46.989  INFO 18185 --- [pc-connection-0] o.a.s.n.client.TransportClientFactory    : Successfully created connection to /<spark-ip>:7077 after 37 ms (0 ms spent in bootstraps)
2018-03-27 12:36:47.000 ERROR 18185 --- [ rpc-client-9-1] o.a.s.network.client.TransportClient     : Failed to send RPC 7111253898393420882 to /<spark-ip>:7077: java.lang.AbstractMethodError
java.lang.AbstractMethodError: null
        at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111) ~[netty-codec-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) ~[netty-handler-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) [netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.22.Final.jar!/:4.1.22.Final]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161]

I also tried to use local[*] spark instead of remote - and its worked. And I also tried to make simple java spark app with connection to remote cluster, but without Spring - it also wokred.

So what could be the issue in using Spark with Spring?

Upvotes: 0

Views: 687

Answers (1)

Pavel Krutikhin
Pavel Krutikhin

Reputation: 53

As @ErnestKiwele hinted, that problem is caused by Netty dependencies

The Spring Boot 2.0 (with Spring 5) uses Netty 4.1.x, where as Spark 2.2.1 uses Netty 4.0.x. To solve this issue you can override Netty dependency in your pom to use older version:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.0.43.Final</version>
</dependency>  

Thus maven will use this netty version. And it fixes my problem.

But if you need also to use some other spring libs (e.x. cassandra spring data, as i am) it may cause problems due to Spring 5 is not capable with older version of netty. All that means, that Spark 2.2.x and lower is not capable with Spring 5. Spark 2.3.0 uses Netty 4.1.x - and it has no problems with Spring 5 (but it has cassandra connector bug yet, which is critical for me). So, in my case, I had to use Spring Boot 1.5.x (Spring 4), which use capable netty version, and it works fine with Spark 2.2.1

Upvotes: 1

Related Questions