Calling Trigger once in Databricks to process Kinesis Stream

I am looking a way to trigger my Databricks notebook once to process Kinesis Stream and using following pattern

 import org.apache.spark.sql.streaming.Trigger

// Load your Streaming DataFrame
   val sdf = spark.readStream.format("json").schema(my_schema).load("/in/path")
// Perform transformations and then write…
   sdf.writeStream.trigger(Trigger.Once).format("delta").start("/out/path")

It looks like it's not possible with AWS Kinesis and that's what Databricks documentation suggest as well. My Question is what else can we do to Achieve that?

Upvotes: 5

Views: 1935

Answers (3)

Marco Roy
Marco Roy

Reputation: 5285

Since Databricks DBR 13.3, it's now possible to use Trigger.AvailableNow (.trigger(availableNow=True) in pyspark) to process data in batches from Kinesis.

See documentation here.

Previously, we were running a cluster 24/7 with .trigger(processingTime=300) in order to process a new batch every 5 minutes.

Now, we are processing one batch at a time and then letting the run complete. And we have configured the job with a continuous trigger, so that as soon as the job completes, another one is triggered (but it takes a while for the cluster to start, so it doesn't run right away).

This has allowed us to reduce the costs for that job by over 50%. 💸

Bonus: We can now do other things in between the runs/batches, such as vacuuming the table (using an inventory table, of course).

Upvotes: 1

Quentin
Quentin

Reputation: 3290

A workaround is to stop after X runs, without trigger. It'll guarantee a fix number of rows per run. The only issue is that if you have millions of rows waiting in the queue you won't have the guarantee to process all of them

In scala you can add an event listener, in python count the number of batches.

from time import sleep
s = sdf.writeStream.format("delta").start("/out/path")

#by defaut keep spark.sql.streaming.numRecentProgressUpdates=100 in the list. Stop after 10 microbatch
#maxRecordsPerFetch is 10 000 by default, so we will consume a max value of 10x10 000= 100 000 messages per run
while len(s.recentProgress) < 10:
  print("Batchs #:"+str(len(s.recentProgress)))
  sleep(10)
s.stop()

You can have a more advanced logic counting the number of message processed per batch and stopping when the queue is empty (the throughput should lower once it's all consumed as you'll only get the "real-time" flow, not the history)

Upvotes: 1

Alex Ott
Alex Ott

Reputation: 87249

As you mentioned in the question the trigger once isn't supported for Kinesis.

But you can achieve what you need by adding into the picture the Kinesis Data Firehose that will write data from Kinesis into S3 bucket (you can select format that you need, like, Parquet, ORC, or just leave in JSON), and then you can point the streaming job to given bucket, and use Trigger.Once for it, as it's a normal streaming source (For efficiency it's better to use Auto Loader that is available on Databricks). Also, to have the costs under the control, you can setup retention policy for your S3 destination to remove or archive files after some period of time, like 1 week or month.

Upvotes: 3

Related Questions