Reputation: 43
I'm using pyspark to process some streaming data coming in and I want to add a new column to my data frame with a 50-second moving average.
i tried using a Window spec with rangeBetween:
import pyspark.sql.window as W
w = (W.Window()
.partitionBy(col("sender"))
.orderBy(F.col("event_time").cast('long'))
.rangeBetween(-50, 0))
df2 = df.withColumn('rolling_average', F.avg("fr").over(w))
But this gives me an error, as structured streaming requires a time-based window (probably to manage state):
AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
Using the sql.window function i can also calculate the a moving average, but this will give me the results by grouping on a window (and unique id key called sender) that uses a tumbling (or hopping) window:
df.select('sender', 'event_time', 'fr').groupBy("sender", window("event_time", "50 second")).avg().alias('avg_fr')
sender | window | avg(fr) |
---|---|---|
59834cfd-6cb2-4ece-8353-0a9b20389656 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.17443667352199554 |
8b5d90b9-65d9-4dd2-b742-31c4f0ce37d6 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.010564474388957024 |
a74204f3-e25d-4737-a302-9206cd69e90a | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.16375258564949036 |
db16426d-a9ba-449b-9777-3bdfadf0e0d9 | {"start":"2021-04-12T09:57:30.000+0000","end":"2021-04-12T09:58:20.000+0000"} | 0.17516431212425232 |
The tumbling window is obviously not what I want and I would need to somehow join this to the original table again. I'm not sure how to define a sliding window based on the irregular event timestamps coming in.
Right now I think about writing a stateful function that stores a set of the previously received records into a state and updating that for each new data point coming in. But this seems quite elaborate for such a common activity that I expect can be done in an easier way.
EDIT: current version of Spark (3.1.1) only allows arbitrary stateful functions to be built in Java or Scala, not python, to safeguard the conversion to JVM.
Any thoughts if this is actually the correct way to go?
Upvotes: 3
Views: 3389
Reputation: 18475
You are getting the Exception because it looks like you are building the Window for batch processing, not a streaming Dataframe.
In the Structured Streaming Programming Guidelines in section Window Operations on Event-Time an example is given that can be applied to your use case:
streamDf = ... # streaming DataFrame of schema { event_time: Timestamp, sender: String, fr: Integer }
# Group the data by window and sender and compute the average of each group
movingAverageDf = streamDf.groupBy(
window(streamDf.event_time, "50 seconds", "5 seconds"),
streamDf.sender
).avg(streamDf.fr)
Keep in mind that without using a Watermark the internal state of your application will grow indefinitely. Therefore it is recommended to also add a Watermark. Make sure to use the same event time in the Watermark as your do for the Window.
Another note on the outputModes of your streaming query: Have a look at the overview in OutputModes to understand which modes are supported for your streaming query.
Upvotes: 2
Reputation: 43
As requested by mike, a minimal reproducible example. Here it is for a non streaming case:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
import datetime
rawData = [(1, "A", "2021-04-15T14:31:45.000", 1, 4.0),
(2, "A", "2021-04-15T14:32:46.000", 3, 5.0),
(3, "B", "2021-04-15T14:32:16.000", 8, 100.0),
(4, "B", "2021-04-15T14:33:16.000", 10, 200.0),
(5, "A", "2021-04-15T14:32:16.000", 2, -3.0),
(6, "B", "2021-04-15T14:32:47.000", 11, -500.0),
(7, "A", "2021-04-15T14:33:17.000", 0, 2.0)]
df= spark.createDataFrame(rawData).toDF("index",\
"sender",\
"event_time",\
"value1",\
"value2")
df=df.select(df['event_time'].astype('Timestamp').alias('ts'),"sender","value1","value2", )
print(df.schema)
display(df)
| ts | sender | value1 | value2 |
|------------------------------|--------|--------|--------|
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 |
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 |
To add a new column to this table with the moving average, I first truncate the timestamp as a new column to 10 second resolution to prevent generating a window for every second, which would seem very inefficient. Using a watermark of 2 minutes to remove late data.
@udf(returnType=TimestampType())
def round_time(dt=None, round_to=10):
if dt.second%round_to==0:
s=dt.second
else:
s=(math.floor(dt.second/round_to)+1)*round_to
y=dt+datetime.timedelta(seconds=s-dt.second)
return y
df=df.withWatermark("ts", "2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
display(df)
| ts | sender | value1 | value2 | trunct_time |
|------------------------------|--------|--------|--------|------------------------------|
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 | 2021-04-15T14:31:50.000+0000 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 | 2021-04-15T14:32:50.000+0000 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 | 2021-04-15T14:32:20.000+0000 |
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 | 2021-04-15T14:33:20.000+0000 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 | 2021-04-15T14:32:20.000+0000 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 | 2021-04-15T14:32:50.000+0000 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 | 2021-04-15T14:33:20.000+0000 |
now i calculate the moving average over 50 second sliding windows that move by 10 second increments.
avgDF = df.withWatermark("ts", "2 minutes").select('value1','sender','ts').groupBy("sender", window("ts", "50 second", '10 second')).avg()
avgDF = avgDF.withColumn("window_end", avgDF.window.end).withColumnRenamed('sender', 'sender2')
display(avgDF)
| sender2 | window | avg(value1) | window_end |
|---------|-------------------------------------------------------------------------------|-------------|------------------------------|
| A | {"start":"2021-04-15T14:31:10.000+0000","end":"2021-04-15T14:32:00.000+0000"} | 1 | 2021-04-15T14:32:00.000+0000 |
| A | {"start":"2021-04-15T14:31:00.000+0000","end":"2021-04-15T14:31:50.000+0000"} | 1 | 2021-04-15T14:31:50.000+0000 |
| A | {"start":"2021-04-15T14:31:20.000+0000","end":"2021-04-15T14:32:10.000+0000"} | 1 | 2021-04-15T14:32:10.000+0000 |
| A | {"start":"2021-04-15T14:31:40.000+0000","end":"2021-04-15T14:32:30.000+0000"} | 1.5 | 2021-04-15T14:32:30.000+0000 |
| A | {"start":"2021-04-15T14:31:30.000+0000","end":"2021-04-15T14:32:20.000+0000"} | 1.5 | 2021-04-15T14:32:20.000+0000 |
| A | {"start":"2021-04-15T14:32:40.000+0000","end":"2021-04-15T14:33:30.000+0000"} | 1.5 | 2021-04-15T14:33:30.000+0000 |
| B | {"start":"2021-04-15T14:31:50.000+0000","end":"2021-04-15T14:32:40.000+0000"} | 8 | 2021-04-15T14:32:40.000+0000 |
| A | {"start":"2021-04-15T14:32:30.000+0000","end":"2021-04-15T14:33:20.000+0000"} | 1.5 | 2021-04-15T14:33:20.000+0000 |
Due to the sliding window that moves around we end up with additional rows in the aggregated table (only partially shown here above):
now we join the two tables back together:
joined_stream=df.join(
avgDF,
expr("""
trunct_time = window_end AND
sender = sender2
"""),
"leftOuter"
)
display(joined_stream.select('ts','sender','value1','value2','avg(value1)'))
|ts |sender|value1|value2|avg(value1)|
|----------------------------|------|------|------|-----------|
|2021-04-15T14:31:45.000+0000|A |1 |4 |1 |
|2021-04-15T14:32:46.000+0000|A |3 |5 |2.5 |
|2021-04-15T14:32:16.000+0000|B |8 |100 |8 |
|2021-04-15T14:33:16.000+0000|B |10 |200 |10.5 |
|2021-04-15T14:32:16.000+0000|A |2 |-3 |1.5 |
|2021-04-15T14:32:47.000+0000|B |11 |-500 |9.5 |
|2021-04-15T14:33:17.000+0000|A |0 |2 |1.5 |
The final result is exactly as i wanted (*).
* However the results may not be exactly as expected, as it could happen that there is a mismatch between the original timestamp resolution in seconds and the aggregation into 10 second chunks
Upvotes: 1
Reputation: 43
For the streaming version i'm doing basically the same thing as for the solution i posted for the non-streaming solution.
schema = StructType([ StructField("ts", TimestampType(), True), StructField("sender", StringType(), True), StructField("value1", LongType(), True), StructField("value2", FloatType(), True) ])
df = spark.readStream.schema(schema).format("csv").load("dbfs:/FileStore/shared_uploads/[email protected]/raw_data*")
df=df.withWatermark("ts", "2 minutes").select('*',round_time(df["ts"]).alias("trunct_time"))
avgDF = df.withWatermark("ts", "2 minutes").select('value1','sender','ts').groupBy("sender", window("ts", "50 second", '10 second')).avg()
avgDF = avgDF.withColumn("window_end", avgDF.window.end).withColumnRenamed('sender', 'sender2').withWatermark("window_end", "2 minutes")
joined_stream=df.join(
avgDF,
expr("""
trunct_time = window_end AND
sender = sender2 AND
"""),
"leftOuter"
)
query = (
joined_stream
.writeStream
.format("memory") # memory = store in-memory table (for testing only)
.queryName("joined") # joined = name of the in-memory table
.outputMode("append") # append = allows stream on stream joins
.start()
)
This results in the following error:
AnalysisException: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details. If you understand the possible risk of correctness issue and still need to run the query, you can disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.;
The documentation mentions:
Any of the stateful operation(s) after any of below stateful operations can have this issue:
streaming aggregation in Append mode or stream-stream outer join
There’s a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
But this is quite a cryptic description on how to solve this issue.Based on: https://issues.apache.org/jira/browse/SPARK-28074:
It means split the queries into multiple steps with 1 stateful operation each and persist the intermediate results to topics. This produces mostly reproducible results. But of course it increases the overall delay of the messages passing through.
Depending on the setting this may or may not be the correct solution, but for this example i decided setting the check correctness parameter to false, so it will no longer throw an exception and only write a warning in the logs.
%sql set spark.sql.streaming.statefulOperator.checkCorrectness.enabled=False
Now it will give me the result i wanted to get:
%sql select * from joined
| ts | sender | value1 | value2 | avg(value1) |
|------------------------------|--------|--------|--------|-------------|
| 2021-04-15T14:33:16.000+0000 | B | 10 | 200 | 10.5 |
| 2021-04-15T14:32:47.000+0000 | B | 11 | -500 | 9.5 |
| 2021-04-15T14:31:45.000+0000 | A | 1 | 4 | 1 |
| 2021-04-15T14:32:16.000+0000 | A | 2 | -3 | 1.5 |
| 2021-04-15T14:32:46.000+0000 | A | 3 | 5 | 2.5 |
| 2021-04-15T14:33:17.000+0000 | A | 0 | 2 | 1.5 |
| 2021-04-15T14:32:16.000+0000 | B | 8 | 100 | 8 |
/* One more caveat, these results only become visible if they are followed by a new datapoint that moves the watermark beyond the threshold (here 2 minutes), which in a streaming application would not be an issue, but for this example i've added a new 8th datapoint a couple of minutes later, which of course isn't visible in the output for the same reason.
Upvotes: 0