Reputation: 923
I have a table in the form
Span Available Time
A 0 0
B 1 0
C 1 0
A 1 1
B 0 1
C 1 1
... ... ...
A 1 N
B 0 N
C 1 N
I want to group this into groups of X Time
s per Span
. So it would look like:
Span Available Time
A 1 0
A 0 1
... ... ...
A 1 X
B 1 0
B 1 1
... ... ...
B 0 X
C 0 0
C 1 1
... ... ...
C 0 X
A 1 X+1
A 0 X+2
... ... ...
A 1 2X
B 1 X+1
B 1 X+2
... ... ...
B 0 2X
... ... ...
... ... ...
A 0 N-X
A 1 N-X+1
... ... ...
A 0 N
B 1 N-X
B 0 N-X+1
... ... ...
B 1 N
C 0 N-X
C 1 N-X+1
... ... ...
C 1 N
Where X
is a factor of N
.
How can I group the data in this way using SQL or Spark's DataFrame API?
Also, how can I aggregate that table by X rows per span to get, for example, the percentage availability for the span from time 0 to X, X to 2X, etc.?
edit:
For context, each group of X rows represents a day, and the whole data set represents a week. So I want to aggregate the availability per day, per span.
edit:
Also, I know what X is. So I want to be able to say something like GROUP BY Span LIMIT X ORDER BY Time
edit:
As a final attempt to describe this better, I want the first X of the first span, then the first X of the next span, and then the first X of the last span, followed by the next X of the first span, the next X of the second span, etc., through to the last rows for each span.
Upvotes: 0
Views: 672
Reputation: 13154
Under the assumption that your time column contains a timestamp and you input data thus looks something like this example rdd:
val rdd = sc.parallelize(List(("A", 0, "2015-01-02 09:00:00"),
("A", 1, "2015-01-02 10:00:00"),
("A", 1, "2015-01-02 11:00:00"),
("B", 0, "2015-01-02 09:00:00"),
("B", 0, "2015-01-02 10:00:00"),
("B", 1, "2015-01-02 11:00:00"),
("A", 1, "2015-01-03 09:00:00"),
("A", 1, "2015-01-03 10:00:00"),
("A", 1, "2015-01-03 11:00:00"),
("B", 0, "2015-01-03 09:00:00"),
("B", 0, "2015-01-03 10:00:00"),
("B", 0, "2015-01-03 11:00:00")
))
you could achieve your grouping and aggregation like this:
rdd.map{case(span,availability,timestamp) => ((span,getDate(timestamp)), (List((availability, time)), availability, 1))}
.reduceByKey((v1,v2) => (v1._1 ++ v2._1, v1._2 + v2._2, v1._3 + v2._3))
.mapValues(v => (v._1, v._2.toDouble/v._3))
(Where getDate()
is some function that will return the date from a timestamp.)
This will produce output in the format of (span, List((availability, time)), availability_percentage). For my example rdd the result will look like this:
(B,List((0,2015-01-02 09:00:00), (0,2015-01-02 10:00:00), (1,2015-01-02 11:00:00)),0.3333333333333333)
(A,List((0,2015-01-02 09:00:00), (1,2015-01-02 10:00:00), (1,2015-01-02 11:00:00)),0.6666666666666666)
(A,List((1,2015-01-03 09:00:00), (1,2015-01-03 10:00:00), (1,2015-01-03 11:00:00)),1.0)
(B,List((0,2015-01-03 09:00:00), (0,2015-01-03 10:00:00), (0,2015-01-03 11:00:00)),0.0)
Upvotes: 1