Reputation: 799
I have a Spark application (version 2.1) writing data to an Ignite server cache (version 2.2). This is the code I use to create the cache from an IgniteContext in the Spark job:
object Spark_Streaming_Processing {
case class Custom_Class(
@(QuerySqlField @field)(index = true) a: String,
@(QuerySqlField @field)(index = true) b: String,
@(QuerySqlField @field)(index = true) c: String,
@(QuerySqlField @field)(index = true) d: String,
@(QuerySqlField @field)(index = true) e: String,
@(QuerySqlField @field)(index = true) f: String,
@(QuerySqlField @field)(index = true) g: String,
@(QuerySqlField @field)(index = true) h: String
)
//START IGNITE CONTEXT
val addresses=new util.ArrayList[String]()
addresses.add("127.0.0.1:48500..48520")
val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>
new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))
).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()
.setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))
,true)
println(igniteContext.ignite().cacheNames())
val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")
val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters
ignite_cache_rdd.savePairs(processed_PairRDD)
}
}
Everything was working fine, but yesterday I decided to destroy the Spark_Ignite cache and restart the Ignite server. However, running the Spark application again, I am now getting the following error
javax.cache.CacheException: Failed to find data nodes for cache: Spark_Ignite
at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.stableDataNodes(GridReduceQueryExecutor.java:447)
at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.query(GridReduceQueryExecutor.java:591)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$8.iterator(IgniteH2Indexing.java:1160)
If I go to the Ignite visor I can see that the cache was created on the Ignite server side, and I can see that the Ignite server detects the Spark application's Ignite client:
[12:17:01] Topology snapshot [ver=4, servers=1, clients=1, CPUs=12, heap=1.9GB]
However, the Ignite client in the Spark application doesn't seem to detect the server node when it starts, even though the cache was created:
18/04/13 12:17:01 INFO GridDiscoveryManager: Topology snapshot [ver=4, servers=0, clients=1, CPUs=12, heap=0.89GB]
18/04/13 12:17:07 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
javax.cache.CacheException: Failed to find data nodes for cache: Spark_Ignite
I should add that the Ignite client won't start if the Ignite server is not up, it looks for it in the addresses given and just hangs. When I connect the server, the cache is created and only then do I get this error.
What might the issue be here? And how is it that it was working before?
Thank you.
UPDATE
Here is the xml config file that I use for my single Ignite server:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd">
<bean id="ignite_cluster.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="ignite_node1" />
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="48510"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="20"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- Addresses and port range of the nodes from the first cluster.
127.0.0.1 can be replaced with actual IP addresses or host names.
Port range is optional. -->
<value>127.0.0.1:48500..48520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="48100"/>
</bean>
</property>
</bean>
<bean class="org.apache.ignite.configuration.FileSystemConfiguration">
<property name="secondaryFileSystem">
<bean class="org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem">
<property name="fileSystemFactory">
<bean class="org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory">
<property name="uri" value="hdfs://localhost:9000/"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean class="org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory">
<property name="uri" value="hdfs://localhost:9000/"/>
<property name="configPaths">
<list>
<value>/etc/hadoop-2.9.0/etc/hadoop/core-site.xml</value>
</list>
</property>
</bean>
</beans>
Upvotes: 2
Views: 1210
Reputation: 4225
I encountered a similar problem with the exact same error message:
javax.cache.CacheException: Failed to find data nodes for cache: <cache name>
at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.stableDataNodes(GridReduceQueryExecutor.java:447)
at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.query(GridReduceQueryExecutor.java:591)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$8.iterator(IgniteH2Indexing.java:1160)
Did the following:
./control.sh --baseline
./control.sh --baseline remove <ConsistentID>
Upvotes: 0