Reputation: 837
I'm trying to integrate a Google Cloud Dataflow pipeline with Google Cloud Pub/Sub Notifications for Google Cloud Storage.
The idea is start processing a file as soon it is created.
The messages are being published and with PubsubIO.readMessagesWithAttributes()
source I manage to extract the file URI:
Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
.fromSubscription(options.getPubsubSubscription()))
.apply(MapElements
.into(TypeDescriptors.strings())
.via((PubsubMessage msg) -> {
String bucket = msg.getAttribute("bucketId");
String object = msg.getAttribute("objectId");
GcsPath uri = GcsPath.fromComponents(bucket, object);
return uri.toString();
}));
Which PTransform
can be used to start reading/processing each file in the uris PCollection
?
Upvotes: 2
Views: 3412
Reputation: 17913
Apache Beam at HEAD includes a PTransform that does exactly what you want: TextIO.readAll() reads a PCollection<String>
of filepatterns or filenames. It will be available in Beam 2.2.0, but for now you can just build a snapshot of Beam yourself from the github repo and depend on that.
Upvotes: 2
Reputation: 1553
Combing Cloud Storage change notifications with Google Cloud Functions should be a good option (still in beta though).
Using Cloud Functions you can launch a Dataflow job using some Javascript code. This is a very good blogpost that should get you on the way. You Dataflow job will kick-off whenever a new file lands in a bucket or a file changes and will process these files.
If you want to stick to your approach, you might want to use the Google Cloud Storage Java SDK to read the files in a custom DoFn. Not sure if that approach is preferable though.
Upvotes: 0