Reputation:
I have a Spark Dataframe with 2 columns as below.
Date | Area
1/1/2016 | 1
3/1/2016 | 4
1/1/2016 | 1
5/1/2016 | 2
1/1/2016 | 3
1/1/2016 | 1
3/1/2016 | 4
1/1/2016 | 2
3/1/2016 | 3
3/1/2016 | 3
1/1/2016 | 4
1/1/2016 | 4
1/1/2016 | 2
And I want an output as
Day: 1/1/2016 -> There are 3 rows at Area1
-> There are 2 rows at Area2
-> There are 1 rows at Area3
-> There are 2 rows at Area4
Day: 3/1/2016 -> There are 0 rows at Area1
-> There are 0 rows at Area2
-> There are 2 rows at Area3
-> There are 2 rows at Area4
Day: 5/1/2016 -> ..........
My java 8 code for this is:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.sql.*;
public class Main {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("My 1st Spark app");
conf.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
Dataset<Row> df = sparkSession.read().option("header", true).option("inferSchema", "true").option("timestampFormat", "yyyy-MM-dd hh:mm:ss").csv("hdfs://quickstart.cloudera:8020//user//cloudera//fares.csv");
Dataset<Row> df = df_date_column.groupBy("Date").count();
But I have a result grouped By date and not by areas. So how can I group by date and areas?
Upvotes: 0
Views: 166
Reputation: 1157
This can done using Spark SQL window function and for each loop on spark dataframe with collect function (not ideal for large data as job will get slower). Below is pyspark code you can convert it to Java as main spark sql query won't change. Later use java for loop and access every element on array i.e sparkDataFrame.collect().
from pyspark.sql.functions import *
data.createOrReplaceTempView("tmp")
# final = data.groupBy("Area").agg(count("Date"))
# final.show(20,False)
df = spark.sql("""
SELECT distinct date,
area,
count(area) over (partition by date,area order by date,area) as area_cnt,
min(area) over (partition by date order by date,area) as area_first,
max(area) over (partition by date order by date,area desc) as area_last
from tmp
order by date, area
""")
df.show(20,False)
for i in df.collect() :
if i.area_first == i.area :
print("Day: " + i.date + " -> There are " + str(i.area_cnt) + " rows at Area" + str(i.area))
else :
print(" -> There are " + str(i.area_cnt) + " rows at Area" + str(i.area))
InputData :
+--------+----+--------+----------+---------+
|date |area|area_cnt|area_first|area_last|
+--------+----+--------+----------+---------+
|1/1/2016|1 |3 |1 |4 |
|1/1/2016|2 |2 |1 |4 |
|1/1/2016|3 |1 |1 |4 |
|1/1/2016|4 |2 |1 |4 |
|3/1/2016|3 |2 |3 |4 |
|3/1/2016|4 |2 |3 |4 |
|5/1/2016|2 |1 |2 |2 |
+--------+----+--------+----------+---------+
Output :
Day: 1/1/2016 -> There are 3 rows at Area1
-> There are 2 rows at Area2
-> There are 1 rows at Area3
-> There are 2 rows at Area4
Day: 3/1/2016 -> There are 2 rows at Area3
-> There are 2 rows at Area4
Day: 5/1/2016 -> There are 1 rows at Area2
Upvotes: 2