Reputation: 937
I have a pipeline that streams JSON messages from PubSub (Unbound PCollection) to Google Cloud Storage. Each file should contain multiple JSON objects, one per line.
I want to create another pipeline that should read all the JSON objects from this GCS bucket for further stream-processing. The most important thing is that this second pipeline should work as a stream rather than a batch. Means I want it to "listen" to the bucket and process every JSON object written to it. An Unbound PCollection.
Is there any way to achieve this behavior?
Thanks
Upvotes: 0
Views: 407
Reputation: 1858
Another user provided a good answer that tells you how to do what you want, however, if I understand your problem correctly, I think I can recommend a cleaner approach.
Assuming the following is true, that you want to:
Then you can instead create one pipeline that simply forks after the "accept Pub/Sub message" step. Dataflow supports this natively very well. You would save a reference to the PCollection
object returned when you use the Pub/Sub sink at the beginning of your pipeline. Then, you would apply multiple chains of DoFn
implementations etc to this one reference. You'll be able to do the windowing with writing to GCS like you do now, plus process each individual message in any way you like.
It might look like this:
Pipeline pipeline = Pipeline.create(options);
PCollection<String> messages = pipeline.apply("Read from Pub/Sub", PubsubIO.readStrings().fromTopic("my_topic_name));
// Current pipeline fork for windowing into GCS
messages.apply("Handle for GCS", ...);
// New fork for more handling
messages.apply("More stuff", ...);
Upvotes: 0
Reputation: 75715
The streaming process only works with PubSub datasource. But don't worry, you can achieve your pipeline.
Upvotes: 2