Reputation: 184
There is a continuous stream of data, after all transformations it has next schema:
root
|-- message_id: string (nullable = true)
|-- device_id: string (nullable = true)
|-- metric_id: long (nullable = true)
|-- value: long (nullable = true)
|-- timestamp: string (nullable = true)
There is also set of rules, i.e.:
if metric_id = 4077 and value > 10 and value < 25
That means if any row in a stream meets that condition, then this row must be pushed into a different stream.
How identify messages that meet alert criteria (there are several) and after push them to different stream?
Upvotes: 4
Views: 1868
Reputation: 310
Following the docs the best way to do this it's using ForEachBatch, this is the exactly the use case described. Structured Streaming - ForEachBatch docs
So, reusing the above example, you will do something like this:
df = readStream.format(...).options(...).load().select(...)
# use both DataFrames for output streams in ForEachBatch
# And Applying transformations inside the ForEachBatch scope, but this can
happen outside
df.writeStream.foreachBatch((batch:df) =>
df.write.format(...).options(location1),
df.filter( (df.metric_id == "4077") & (df.value > 10) & (df.value < 45)
).write.format(...).options(location2)
)
.start()
.awaitTermination()
I hope I've helped, thanks !
Upvotes: 0
Reputation: 18495
Spark Structured Streaming applications allow you to have multiple output streams using the same input stream.
That means, if for example df
is your input streaming DataFrame you could just define a DataFrame filter and use the resulting, filtered DataFrame for another output stream as below:
df = readStream.format(...).options(...).load().select(...)
# create a new DataFrame that only contains alters
alertsDf = df.filter( (df.metric_id == "4077") & (df.value > 10) & (df.value < 45) )
# use both DataFrames for output streams
df.writeStream.format(...).options(...).start()
alertsDf.writeStream.format(...).options(...).start()
spark.streams.awaitTermination()
For fault-tolerance it is recommended to set the option checkpointLocation
for each output stream separately.
Upvotes: 3