Fábio Uechi
Fábio Uechi

Reputation: 837

How to use pubsub notifications for cloud storage to trigger dataflow pipeline

I'm trying to integrate a Google Cloud Dataflow pipeline with Google Cloud Pub/Sub Notifications for Google Cloud Storage. The idea is start processing a file as soon it is created. The messages are being published and with PubsubIO.readMessagesWithAttributes() source I manage to extract the file URI:

Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
            .withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
            .fromSubscription(options.getPubsubSubscription()))
            .apply(MapElements
                    .into(TypeDescriptors.strings())
                    .via((PubsubMessage msg) -> {
                        String bucket = msg.getAttribute("bucketId");
                        String object = msg.getAttribute("objectId");
                        GcsPath uri = GcsPath.fromComponents(bucket, object);
                        return uri.toString();
                    }));

Which PTransform can be used to start reading/processing each file in the uris PCollection?

Upvotes: 2

Views: 3412

Answers (2)

jkff
jkff

Reputation: 17913

Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String> of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.

Upvotes: 2

Matthias Baetens
Matthias Baetens

Reputation: 1553

Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).

Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.

If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.

Upvotes: 0

Related Questions