Nikster
Nikster

Reputation: 474

Apache Spark Group By (get First & Last values in Group)

I am running hadoop on a VM Cluster on my Schools cloud (honestly don't know the specifics). I'm using apache spark to talk to hadoop and run my current code.

I've been trying to perform some aggregations on my data to find a total consumption value for an hour/day/month (ENERGY_READING column from data)

CONSUMPTION.tsv with some manipulations done already

+--------+-------------------+----+--------------+
|HOUSE_ID|CONDATE            |HOUR|ENERGY_READING|
+--------+-------------------+----+--------------+
|9       |2015-05-30 00:00:00|0   |11000.001444  |
|9       |2015-05-30 00:00:10|0   |11000.002888  |
|9       |2015-05-30 00:00:20|0   |11000.004332  |
|9       |2015-05-30 00:00:30|0   |11000.005776  |
|9       |2015-05-30 00:00:40|0   |11000.00722   |
|9       |2015-05-30 00:00:50|0   |11000.008664  |
|9       |2015-05-30 00:01:00|0   |11000.010108  |
|9       |2015-05-30 00:01:10|0   |11000.011552  |
|9       |2015-05-30 00:01:20|0   |11000.012996  |
|9       |2015-05-30 00:01:30|0   |11000.01444   |
|9       |2015-05-30 00:01:40|0   |11000.015884  |
|9       |2015-05-30 00:01:50|0   |11000.017328  |
|9       |2015-05-30 00:02:00|0   |11000.018772  |
|9       |2015-05-30 00:02:10|0   |11000.020216  |
|9       |2015-05-30 00:02:20|0   |11000.02166   |
|9       |2015-05-30 00:02:30|0   |11000.023104  |
|9       |2015-05-30 00:02:40|0   |11000.024548  |
|9       |2015-05-30 00:02:50|0   |11000.025992  |
|9       |2015-05-30 00:03:00|0   |11000.027436  |
|9       |2015-05-30 00:03:10|0   |11000.02888   |
+--------+-------------------+----+--------------+

Java Class

StructType schema = new StructType()
                .add("LOG_ID",IntegerType)
                .add("HOUSE_ID", IntegerType)
                .add("CONDATE", StringType)
                .add("ENERGY_READING", DoubleType)
                .add("FLAG", IntegerType);

        Dataset<Row> data = spark.read()
                .option("header", true)
                .option("delimiter", "\t")
                .option("mode","DROPMALFORMED")
                .schema(schema)
                .csv("hdfs://hd-master:9820/CONSUMPTION.tsv");

        data = data.withColumn("CONDATE", functions.to_timestamp(functions.col("CONDATE"),"yy-MM-dd HH:mm:ss.SSSSSSSSS").cast(TimestampType));

        data = data.withColumn("HOUR", functions.hour(functions.col("CONDATE")));

        Dataset<Row> df = data.select("HOUSE_ID","CONDATE","HOUR","ENERGY_READING");

So the data I have increments every 10 seconds. I want to get the first and last values for each hour/day/month.

Essentially what I want is the first value of the day 11000.001444 and the last value lets just say 11000.01444 in this case. And then subtract the second from the first to get the total consumption for that hour/day/month.

Which would give me an output of

HOUSE_ID   CONDATE      HOUR       ENERGY_READING
  9        15-05-30      0              0.013
  9        15-05-30      1              ...

Upvotes: 0

Views: 305

Answers (1)

mck
mck

Reputation: 42332

The code below will group by minute and calculate consumption in that minute:

import org.apache.spark.sql.expressions.Window

Dataset<Row> df2 = df.groupBy(
    functions.col("HOUSE_ID"),
    functions.minute(col("CONDATE")).alias("minute")
).agg(
    functions.min("ENERGY_READING").alias("ENERGY_READING")
).withColumn(
    "LAG_ENERGY_READING",
    functions.lag(functions.col("ENERGY_READING"), 1).over(Window.partitionBy("HOUSE_ID").orderBy("minute"))
).withColumn(
    "consumption",
    functions.expr("ENERGY_READING - LAG_ENERGY_READING")
)

Upvotes: 1

Related Questions