Reputation: 2616
I am looking a way to trigger my Databricks notebook once to process Kinesis Stream and using following pattern
import org.apache.spark.sql.streaming.Trigger
// Load your Streaming DataFrame
val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")
It looks like it's not possible with AWS Kinesis and that's what Databricks documentation suggest as well. My Question is what else can we do to Achieve that?
Upvotes: 5
Views: 1935
Reputation: 5285
Since Databricks DBR 13.3, it's now possible to use Trigger.AvailableNow
(.trigger(availableNow=True)
in pyspark) to process data in batches from Kinesis.
Previously, we were running a cluster 24/7 with .trigger(processingTime=300)
in order to process a new batch every 5 minutes.
Now, we are processing one batch at a time and then letting the run complete. And we have configured the job with a continuous trigger, so that as soon as the job completes, another one is triggered (but it takes a while for the cluster to start, so it doesn't run right away).
This has allowed us to reduce the costs for that job by over 50%. 💸
Bonus: We can now do other things in between the runs/batches, such as vacuuming the table (using an inventory table, of course).
Upvotes: 1
Reputation: 3290
A workaround is to stop after X runs, without trigger. It'll guarantee a fix number of rows per run. The only issue is that if you have millions of rows waiting in the queue you won't have the guarantee to process all of them
In scala you can add an event listener, in python count the number of batches.
from time import sleep
s = sdf.writeStream.format("delta").start("/out/path")
#by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch
#maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run
while len(s.recentProgress) < 10:
print("Batchs #:"+str(len(s.recentProgress)))
sleep(10)
s.stop()
You can have a more advanced logic counting the number of message processed per batch and stopping when the queue is empty (the throughput should lower once it's all consumed as you'll only get the "real-time" flow, not the history)
Upvotes: 1
Reputation: 87249
As you mentioned in the question the trigger once isn't supported for Kinesis.
But you can achieve what you need by adding into the picture the Kinesis Data Firehose that will write data from Kinesis into S3 bucket (you can select format that you need, like, Parquet, ORC, or just leave in JSON), and then you can point the streaming job to given bucket, and use Trigger.Once for it, as it's a normal streaming source (For efficiency it's better to use Auto Loader that is available on Databricks). Also, to have the costs under the control, you can setup retention policy for your S3 destination to remove or archive files after some period of time, like 1 week or month.
Upvotes: 3