khateeb
khateeb

Reputation: 5469

How do I group data by a field in Spark?

I want to read from a database two columns, group them by the first column and insert the result into another table using Spark. My program is written in Java. I tried the following:

public static void aggregateSessionEvents(org.apache.spark.SparkContext sparkContext) {
    com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD<String, String> logs = javaFunctions(sparkContext)
            .cassandraTable("dove", "event_log", mapColumnTo(String.class), mapColumnTo(String.class))
            .select("session_id", "event");
    logs.groupByKey();
    com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions(logs).writerBuilder("dove", "event_aggregation", null).saveToCassandra();
    sparkContext.stop();
}

This is giving me the error:

The method cassandraTable(String, String, RowReaderFactory<T>) in the type SparkContextJavaFunctions is not applicable for the arguments (String, String, RowReaderFactory<String>, mapColumnTo(String.class))

My dependencies are:

<dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.0.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>2.0.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
</dependencies>

How do I solve this?

Upvotes: 1

Views: 330

Answers (2)

khateeb
khateeb

Reputation: 5469

To group data by a field, do the following steps:

  1. The data must be retrieved into a JavaRDD of that table.
  2. The required columns must be extracted into a pair with the key as the first and the rest of the data as the second.
  3. Use reduceByKey to aggregate the values according to the requirement.

After that, the data can be inserted in another table or used for some further processing.

public static void aggregateSessionEvents(SparkContext sparkContext) {
    JavaRDD<Data> datas = javaFunctions(sparkContext).cassandraTable("test", "data",
            mapRowTo(Data.class));
    JavaPairRDD<String, String> pairDatas = datas
            .mapToPair(data -> new Tuple2<>(data.getKey(), data.getValue()));
    pairDatas.reduceByKey((value1, value2) -> value1 + "," + value2);
    sparkContext.stop();
}

Upvotes: 0

Sreekar
Sreekar

Reputation: 1015

Change this:

.cassandraTable("dove", "event_log", mapColumnTo(String.class), mapColumnTo(String.class))

to:

.cassandraTable("dove", "event_log", mapColumnTo(String.class))

You are sending extra argument.

Upvotes: 1

Related Questions