Reputation: 9537
This question is a follow up of this answer. Spark is displaying an error when the following situation arises:
# Group results in 12 second windows of "foo", then by integer buckets of 2 for "bar"
fooWindow = window(col("foo"), "12 seconds"))
# A sub bucket that contains values in [0,2), [2,4), [4,6]...
barWindow = window(col("bar").cast("timestamp"), "2 seconds").cast("struct<start:bigint,end:bigint>")
results = df.groupBy(fooWindow, barWindow).count()
The error is:
"Multiple time window expressions would result in a cartesian product of rows, therefore they are currently not supported."
Is there some way to achieve the desired behavior?
Upvotes: 4
Views: 713
Reputation: 9537
I was able to come up with a solution using an adaptation of this SO answer.
Note: This solution only works if there is at most one call to window
, meaning multiple time windows are not allowed. Doing a quick search on the spark github shows there's a hard limit of <= 1
windows.
By using withColumn
to define the buckets for each row, we can then group by that new column directly:
from pyspark.sql import functions as F
from datetime import datetime as dt, timedelta as td
start = dt.now()
second = td(seconds=1)
data = [(start, 0), (start+second, 1), (start+ (12*second), 2)]
df = spark.createDataFrame(data, ('foo', 'bar'))
# Create a new column defining the window for each bar
df = df.withColumn("barWindow", F.col("bar") - (F.col("bar") % 2))
# Keep the time window as is
fooWindow = F.window(F.col("foo"), "12 seconds").start.alias("foo")
# Use the new column created
results = df.groupBy(fooWindow, F.col("barWindow")).count().show()
# +-------------------+---------+-----+
# | foo|barWindow|count|
# +-------------------+---------+-----+
# |2019-01-24 14:12:48| 0| 2|
# |2019-01-24 14:13:00| 2| 1|
# +-------------------+---------+-----+
Upvotes: 5