Linford Bacon
Linford Bacon

Reputation: 323

Preventing Spark from storing state in stream/stream joins

I have two streaming datasets, let's call them fastStream and slowStream.

The fastStream is a streaming dataset that I am consuming from Kafka via the structured streaming API. I am expecting to receive potentially thousands of messages a second.

The slowStream is actually a reference (or lookup) table that is being 'upserted' by another stream and contains data that I want to join on to each message in the fastStream before I save the records to a table. The slowStream is only updated when someone changes the metadata, which can happen at any time but we would expect to change maybe once every few days.

Each record in the fastStream will have exactly one corresponding message in the slowStream and I essentially want to make that join happen immediately with whatever data is in the slowStream table. I don't want to wait to see if a potential match could occur if new data arrives in the slowStream.

The problem that I have is that according to the Spark docs:

Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results.

I have tried adding a watermark to the fastStream but I think it has no effect since the docs indicate that the watermarked columns need to be referenced in the join

Ideally I would write something like:

# Apply a watermark to the fast stream
fastStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/fastStream") \
.withWatermark("timestamp", "1 hour") \
.alias("fastStream")

# The slowStream cannot be watermarked since it is only slowly changing
slowStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/slowStream") \
.alias("slowStream")

# Prevent the join from buffering the fast stream by 'telling' spark that there will never be new matches.
fastStream.join( 
  slowStrean,
  expr(""" 
    fastStream.slow_id = slowStream.id
    AND fastStream.timestamp > watermark
    """
  ),
  "inner"
).select("fastStream.*", "slowStream.metadata")

But I don't think you can reference the watermark in the SQL expression.

Essentially, while I'm happy to have the slowStream buffered (so the whole table is in memory) I can't have the fastStream buffered as this table will quickly consume all memory. Instead, I would simply like to drop messages from the fastStream that aren't matched instead of retaining them to see if they might match in future.

Any help very gratefully appreciated.

Upvotes: 3

Views: 2254

Answers (3)

Linford Bacon
Linford Bacon

Reputation: 323

Answering my own question with what I ended up going with. It's certainly not ideal but for all my searching, there doesn't seem to be the control within Spark structured streaming to address this use case.

So my solution was to read the dataset and conduct the join inside a foreachBatch. This way I prevent Spark from storing a ton of unnecessary state and get the joins conducted immediately. On the downside, there seems to be no way to incrementally read a stream table so instead, I am re-reading the entire table every time...

def join_slow_stream(df, batchID):
    
  # Read as a table rather than a stream
  slowdf = spark.read \
    .format("delta") \
    .load("dbfs:/mnt/some_file/slowStream") \
    .alias("slowStream")
  
  out_df = df.join(
    slowdf,
    expr(""" 
      fastStream.slow_id = slowStream.id
      """
    ),
    "inner"
  ).select("fastStream.*", "slowStream.metadata")

  # write data to database
  db_con.write(out_df)



fastStream.writeStream.foreachBatch(join_slow_stream)




 

Upvotes: 2

Michael Heil
Michael Heil

Reputation: 18495

For inner Stream-Stream joins watermarking and event-time constraints (join condition) are optional.

If an unbounded state is not an issue for you in terms of volume you can choose not to specify them. In that case, all data will be buffered and your data from the fastStream will immediately be joined with all the data from the slowStream.

Only when both parameters are specified your state will be cleaned up. Note the purpose of those two parameters:

  • Event-time constraint (time range join condition): What ist the maximum time range between the generation of the two events at their respective sources?

  • Watermark: What is the maximum duration an event can be delayed in transit between the source and the processing engine?

To define the two parameters you need to first answer the above mentioned questions (which are quoted from the book "Learning Apache Spark, 2nd edition" published by O`Reilly).

Regarding your code comment:

"Prevent the join from buffering the fast stream by 'telling' spark that there will never be new matches."

Remember that buffering in stream-stream join is necessary. Otherwise you would just be able to join the data that is available within the current micro-batch. As the slowStream does not have regular updates but the fastStream is updating its data quite fast you would probably never get any join matches at all without buffering the data.

Overall, for the use case you are describing ("Join fast changing data with slow changing metadata") it is usually better to use a stream-static join approach where the slow changing data becomes the static part.

In a stream-static join every row in the stream data will be joined with the full static data whereas the static table is loaded in every single micro-batch. If loading the static table reduces your performance you may think about caching it and have it updated regularly as described in Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically.

Upvotes: 7

ggordon
ggordon

Reputation: 10035

If you are interested in referencing the "time that was watermarked" i.e. the 1 hour, you may replace watermark in the expression with current_timestamp - interval '1' hour.

Since you are attempting to join two streams, spark will insist that both use watermarks

Reference

Upvotes: 0

Related Questions