Reputation: 1
I'm just starting to work with Spark in streaming mode and I want to do the following thing:
I want to write a script on python using my Spark cluster to read streamed data from S3 bucket (track it all the time for a new .csv files) which contains a solid amount of data, after that I want to do some transformations (just a regular filtering and UDFs without any aggregations) and put it in the final table (as well in a streaming mode - to append this data at the end) - let's call this in memory table X. I don't want X to contain all data generated during the entire time this job has been running (I just don't need it and the amount of data would be huge which may bring me to the OOM I suppose) but I need only the last 15 minutes data (in X table there is a field called event_at_timestamp). I kind'a need a processed cast of the last 15 minutes of my S3 stream.
I know there is a concept of Watermarking - it looks really similar to what I need (kinda check the time when event was generated and process it accordingly) but seems like it can't automatically delete rows from my table according to the Watermark... I'm really confused with that, Spark and Watermarking can do much more sophisticated things but seems like it doesn't cover my case. I also know that Watermarking works with aggregations - really similar to what I need, but I don't need artificial/fictional aggregations since I don't have it in my transformation pipeline.
Code example here:
path = f's3a://my_bucket/events/streaming/date={date}/hour={hour}'
events_list = ['event_log_1', 'event_log_2']
csv_schema = T.StructType(
[
T.StructField("name_of_the_event", T.StringType(), True),
T.StructField("event_at_timestamp", T.TimestampType(), True),
#schema...doesn't matter...
]
)
spark_stream = spark.readStream.format('csv') \
.options(header='false', delimiter=",", escape='"') \
.option("startingPosition", "latest") \
.option("latestFirst", "true") \
.schema(csv_schema) \
.load(path) \
.filter((F.col("name_of_the_event").isin(events_list)))
df = df.make_some_regular_spark_processing_like_filter_some_udf_transformations_also_doesnt_matter()
df.writeStream \
.outputMode("update") \
.format("memory") \
.queryName("streaming_table") \
.start()
I found a really similar topic here but there is no good answers. Don't know why, for me it seems like a really good and common use case, isn't it? Please don't refer me to the custom sinks (like sources to write to with ability to maintain states).
Thanks in advance. Will appreciate any help.
P.S.
I've tried some sql statements like
spark.sql("DELETE FROM X WHERE event_time < current_timestamp() - INTERVAL 15 MINUTES")
which clearly will not help since stream transformations process only new data of stream.
Also I've tried MapGroupsWithState approach but it didn't help ether.
Lastly I've tried a batch jobs and .trigger(processingTime="10 seconds") approach but again I need a pure streaming.
Upvotes: 0
Views: 50