user101010
user101010

Reputation: 117

Apache beam / PubSub time delay before processing files

I need to delay processing or publishing filenames (files). I am looking for the best option.

Currently I have two Apache Beam Dataflows and PubSub in between. First dataflow reads filenames from source and pushes those to PubSub topic. Another dataflow reads them and process them. However my use case is to start processing/reading actual files minimum 1 hour after they are being created in the source.

So I have two options:

1) Delay publishing a message in order to process it right away but in the good/expected moment

2) Delay processing of retrieved files

Like above mentioned I am looking for the best solution. I am not sure if guava retry mechanism should be used in Apache Beam ? Any other ideas?

Upvotes: 0

Views: 880

Answers (1)

Jeff Klukas
Jeff Klukas

Reputation: 1357

You could likely achieve what you want via triggering/window configuration in the publishing job.

Then, you could define a windowing configuration where the trigger does not fire until after a 1 hour delay. Something like:

Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))
      .triggering(AfterProcessingTime.pastFirstElementInPane()
                  .plusDelayOf(Duration.standardHours(1)))

Keep in mind that you'll end up with a job that's simply sitting doing not much of anything except holding onto state for an hour. Also, the above is based solely on processing time, so it will wait an hour after job start even if the actual creation time of the files is old enough that it could emit the results immediately.

You could refine this to an event time trigger, but you would likely need to write your own code to assign timestamps to your records (the filenames). To my knowledge, Beam does not currently have built-in support for reading the creation time of files. When reading files via TextIO, for example, I have observed that the records are all assigned a default static timestamp. You should check the specifics of the transform you're using to read filenames to see if it perhaps does something more useful for your purposes. You can also use a WithTimestamps transform to assign timestamps on your own.

Upvotes: 2

Related Questions