Mikhail Dubkov
Mikhail Dubkov

Reputation: 1233

Spark Structured Streaming: join stream with data that should be read every micro batch

I have a stream from HDFS and I need to join it with my metadata that is also in HDFS, both Parquets.

My metadata sometimes got updated and I need to join with fresh and most recent, that means read metadata from HDFS every stream micro batch ideally.

I tried to test this, but unfortunately Spark reads metadata once that cache files(supposedly), even if I tried with spark.sql.parquet.cacheMetadata=false.

Is there a way how to read every micro batch? Foreach Writer is not what I'm looking for?

Here's code examples:

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.sql("SET spark.sql.parquet.cacheMetadata=false")

val stream = spark.readStream.parquet("/tmp/streaming/")

val metadata = spark.read.parquet("/tmp/metadata/")

val joinedStream = stream.join(metadata, Seq("id"))

joinedStream.writeStream.option("checkpointLocation", "/tmp/streaming-test/checkpoint").format("console").start()



/tmp/metadata/ got updated with spark append mode.

As far as I understand, with metadata accessing through JDBC jdbc source and spark structured streaming, Spark will query each micro batch.

Upvotes: 0

Views: 1055

Answers (1)

Mikhail Dubkov
Mikhail Dubkov

Reputation: 1233

As far as I found, there are two options:

  1. Create temp view and refresh it using interval:

    metadata.createOrReplaceTempView("metadata")

and trigger refresh in separate thread:

spark.catalog.refreshTable("metadata")

NOTE: in this case spark will read the same path only, it does not work if you need read metadata from different folders on HDFS, e.g. with timestamps etc.

  1. Restart stream with interval as Tathagata Das suggested

This way is not suitable for me, since my metadata might be refreshed several times per hour.

Upvotes: 1

Related Questions