Takito Isumoro
Takito Isumoro

Reputation: 184

Use single streaming DataFrame for multiple output streams in PySpark Structured Streaming

There is a continuous stream of data, after all transformations it has next schema:

root
 |-- message_id: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- metric_id: long (nullable = true)
 |-- value: long (nullable = true)
 |-- timestamp: string (nullable = true)

There is also set of rules, i.e.:

if metric_id = 4077 and value > 10 and value < 25

That means if any row in a stream meets that condition, then this row must be pushed into a different stream.

How identify messages that meet alert criteria (there are several) and after push them to different stream?

Upvotes: 4

Views: 1868

Answers (2)

Luan Carvalho
Luan Carvalho

Reputation: 310

Following the docs the best way to do this it's using ForEachBatch, this is the exactly the use case described. Structured Streaming - ForEachBatch docs

So, reusing the above example, you will do something like this:

df = readStream.format(...).options(...).load().select(...)

# use both DataFrames for output streams in ForEachBatch
# And Applying transformations inside the ForEachBatch scope, but this can 
  happen outside

df.writeStream.foreachBatch((batch:df) =>
     df.write.format(...).options(location1),
     df.filter( (df.metric_id == "4077") & (df.value > 10)  & (df.value < 45) 
     ).write.format(...).options(location2)

)
.start()
.awaitTermination()

I hope I've helped, thanks !

Upvotes: 0

Michael Heil
Michael Heil

Reputation: 18495

Spark Structured Streaming applications allow you to have multiple output streams using the same input stream.

That means, if for example df is your input streaming DataFrame you could just define a DataFrame filter and use the resulting, filtered DataFrame for another output stream as below:

df = readStream.format(...).options(...).load().select(...)

# create a new DataFrame that only contains alters
alertsDf = df.filter( (df.metric_id == "4077") & (df.value > 10)  & (df.value < 45) )

# use both DataFrames for output streams
df.writeStream.format(...).options(...).start()
alertsDf.writeStream.format(...).options(...).start()

spark.streams.awaitTermination()

For fault-tolerance it is recommended to set the option checkpointLocation for each output stream separately.

Upvotes: 3

Related Questions