user12541449
user12541449

Reputation:

Group - Count from 2 columns from a Dataframe - Spark Java

I have a Spark Dataframe with 2 columns as below.

Date      | Area
1/1/2016  |  1
3/1/2016  |  4
1/1/2016  |  1
5/1/2016  |  2
1/1/2016  |  3
1/1/2016  |  1
3/1/2016  |  4
1/1/2016  |  2
3/1/2016  |  3
3/1/2016  |  3
1/1/2016  |  4
1/1/2016  |  4
1/1/2016  |  2

And I want an output as

 Day: 1/1/2016 -> There are 3 rows at Area1
               -> There are 2 rows at Area2
               -> There are 1 rows at Area3
               -> There are 2 rows at Area4
 Day: 3/1/2016 -> There are 0 rows at Area1
               -> There are 0 rows at Area2
               -> There are 2 rows at Area3
               -> There are 2 rows at Area4
 Day: 5/1/2016 -> ..........

My java 8 code for this is:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.sql.*;

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName("My 1st Spark app");
        conf.setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();

        Dataset<Row> df = sparkSession.read().option("header", true).option("inferSchema", "true").option("timestampFormat", "yyyy-MM-dd hh:mm:ss").csv("hdfs://quickstart.cloudera:8020//user//cloudera//fares.csv");
    Dataset<Row> df = df_date_column.groupBy("Date").count();

But I have a result grouped By date and not by areas. So how can I group by date and areas?

Upvotes: 0

Views: 166

Answers (1)

Manish
Manish

Reputation: 1157

This can done using Spark SQL window function and for each loop on spark dataframe with collect function (not ideal for large data as job will get slower). Below is pyspark code you can convert it to Java as main spark sql query won't change. Later use java for loop and access every element on array i.e sparkDataFrame.collect().

    from pyspark.sql.functions import *

    data.createOrReplaceTempView("tmp")

    # final = data.groupBy("Area").agg(count("Date"))
    # final.show(20,False)

    df = spark.sql("""
    SELECT distinct date, 
                    area, 
                    count(area) over (partition by date,area order by date,area) as area_cnt,
                    min(area) over (partition by date order by date,area) as area_first,
                    max(area) over (partition by date order by date,area desc) as area_last
    from tmp
    order by date, area
    """)
    df.show(20,False)

    for i in df.collect() :
        if i.area_first == i.area : 
            print("Day: " + i.date + " -> There are " + str(i.area_cnt) + " rows at Area" + str(i.area))
        else :
            print("              -> There are " + str(i.area_cnt) + " rows at Area" + str(i.area))

    InputData :
    +--------+----+--------+----------+---------+
    |date    |area|area_cnt|area_first|area_last|
    +--------+----+--------+----------+---------+
    |1/1/2016|1   |3       |1         |4        |
    |1/1/2016|2   |2       |1         |4        |
    |1/1/2016|3   |1       |1         |4        |
    |1/1/2016|4   |2       |1         |4        |
    |3/1/2016|3   |2       |3         |4        |
    |3/1/2016|4   |2       |3         |4        |
    |5/1/2016|2   |1       |2         |2        |
    +--------+----+--------+----------+---------+

    Output :
    Day: 1/1/2016 -> There are 3 rows at Area1
                -> There are 2 rows at Area2
                -> There are 1 rows at Area3
                -> There are 2 rows at Area4
    Day: 3/1/2016 -> There are 2 rows at Area3
                -> There are 2 rows at Area4
    Day: 5/1/2016 -> There are 1 rows at Area2

Upvotes: 2

Related Questions