Reputation: 602
I am using a simple Java program to index a spark JavaRDD into Elasticsearch. My code looks like this -
SparkConf conf = new SparkConf().setAppName("IndexDemo").setMaster("spark://ct-0094:7077");
conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
conf.set("es.index.auto.create", "true");
conf.set("es.nodes", "192.168.50.103");
conf.set("es.port", "9200");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addJar("./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar");
String arrayval = "string";
List<Data> data = Arrays.asList(
new Data(1l, 10l, arrayval+"1"),
new Data(2l, 20l, arrayval+"2"),
new Data(3l, 30l, arrayval+"3"),
new Data(4l, 40l, arrayval+"4"),
new Data(5l, 50l, arrayval+"5"),
new Data(6l, 60l, arrayval+"6"),
new Data(7l, 70l, arrayval+"7"),
new Data(8l, 80l, arrayval+"8"),
new Data(9l, 90l, arrayval+"9"),
new Data(10l, 100l, arrayval+"10")
);
JavaRDD<Data> javaRDD = sc.parallelize(data);
saveToEs(javaRDD, "index/type");
Running above codes gives an exception (Stack Trace)-
15/01/16 13:20:41 INFO spark.SecurityManager: Changing view acls to: root 15/01/16 13:20:41 INFO spark.SecurityManager: Changing modify acls to: root 15/01/16 13:20:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/01/16 13:20:41 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/01/16 13:20:41 INFO Remoting: Starting remoting 15/01/16 13:20:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ct-0015:55586] 15/01/16 13:20:41 INFO util.Utils: Successfully started service 'sparkDriver' on port 55586. 15/01/16 13:20:41 INFO spark.SparkEnv: Registering MapOutputTracker 15/01/16 13:20:41 INFO spark.SparkEnv: Registering BlockManagerMaster 15/01/16 13:20:41 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150116132041-f924 15/01/16 13:20:41 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB 15/01/16 13:20:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/01/16 13:20:41 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a65b108f-e131-480a-85b2-ed65650cf991 15/01/16 13:20:42 INFO spark.HttpServer: Starting HTTP Server 15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031 15/01/16 13:20:42 INFO server.AbstractConnector: Started [email protected]:34049 15/01/16 13:20:42 INFO util.Utils: Successfully started service 'HTTP file server' on port 34049. 15/01/16 13:20:42 INFO server.Server: jetty-8.1.14.v20131031 15/01/16 13:20:42 INFO server.AbstractConnector: Started [email protected]:4040 15/01/16 13:20:42 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/01/16 13:20:42 INFO ui.SparkUI: Started SparkUI at http://ct-0015:4040 15/01/16 13:20:42 INFO client.AppClient$ClientActor: Connecting to master spark://ct-0094:7077... 15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150116131933-0078 15/01/16 13:20:42 INFO netty.NettyBlockTransferService: Server created on 34762 15/01/16 13:20:42 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/01/16 13:20:42 INFO storage.BlockManagerMasterActor: Registering block manager ct-0015:34762 with 2.3 GB RAM, BlockManagerId(, ct-0015, 34762) 15/01/16 13:20:42 INFO storage.BlockManagerMaster: Registered BlockManager 15/01/16 13:20:42 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/01/16 13:20:43 INFO spark.SparkContext: Added JAR ./target/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://192.168.50.103:34049/jars/SparkPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1421394643161 Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:367) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:30) at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:24) at org.elasticsearch.spark.rdd.api.java.JavaEsSpark$.saveToEs(JavaEsSpark.scala:28) at org.elasticsearch.spark.rdd.api.java.JavaEsSpark.saveToEs(JavaEsSpark.scala) at com.cleartrail.spark.poc.elasticsearch.demo.ESPerformerClass.main(ESPerformerClass.java:39)
I have following dependencies in pom.xml -
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming_2.9.2</artifactId>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
I am using ElastiSearch-0.90.3, Apache Spark-1.2.0
Is there any version mismatch? Or the method saveToEs is deprecated?
Upvotes: 0
Views: 2547
Reputation: 602
Actually, there was a version mismatch, but not of Elasticsearch and elasticsearch-hadoop, but elasticsearch and spark. The integration of elasticsearch is available with Spark 1.1.0 only (yet). So I changed the version of Spark and my problem is gone.
Upvotes: 0