Reputation: 5469
I want to read from a database two columns, group them by the first column and insert the result into another table using Spark. My program is written in Java. I tried the following:
public static void aggregateSessionEvents(org.apache.spark.SparkContext sparkContext) {
com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD<String, String> logs = javaFunctions(sparkContext)
.cassandraTable("dove", "event_log", mapColumnTo(String.class), mapColumnTo(String.class))
.select("session_id", "event");
logs.groupByKey();
com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions(logs).writerBuilder("dove", "event_aggregation", null).saveToCassandra();
sparkContext.stop();
}
This is giving me the error:
The method cassandraTable(String, String, RowReaderFactory<T>) in the type SparkContextJavaFunctions is not applicable for the arguments (String, String, RowReaderFactory<String>, mapColumnTo(String.class))
My dependencies are:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
How do I solve this?
Upvotes: 1
Views: 330
Reputation: 5469
To group data by a field, do the following steps:
After that, the data can be inserted in another table or used for some further processing.
public static void aggregateSessionEvents(SparkContext sparkContext) {
JavaRDD<Data> datas = javaFunctions(sparkContext).cassandraTable("test", "data",
mapRowTo(Data.class));
JavaPairRDD<String, String> pairDatas = datas
.mapToPair(data -> new Tuple2<>(data.getKey(), data.getValue()));
pairDatas.reduceByKey((value1, value2) -> value1 + "," + value2);
sparkContext.stop();
}
Upvotes: 0
Reputation: 1015
Change this:
.cassandraTable("dove", "event_log", mapColumnTo(String.class), mapColumnTo(String.class))
to:
.cassandraTable("dove", "event_log", mapColumnTo(String.class))
You are sending extra argument.
Upvotes: 1