Reputation: 132
I have a specific use case for writing my pipeline data. I wanted to make a single Pub/Sub Subscription and wanted to read those from that single source and write the Pcollection at multiple sinks without making another Pub/Sub subscription for it. I've been wanting to make a Pipeline such that I've multiple pipelines in a single dataflow working in parallel and write the same pipeline data, firstly in Google Cloud Storage and Secondly at Bigquery by just using a single subscription. Code or references for the same would be helpful and bring light to the direction I'm working in.
Thanks in advance!!
Upvotes: 2
Views: 373
Reputation: 6572
You only have to do multi sinks in your Beam
job to meet your need.
In Beam
you can build a PCollection
and then sink this PCollection
to multiple places :
Example with Beam Python
:
result_pcollection = (inputs | 'Read from pub sub' >> ReadFromPubSub(
subscription=subscription_path)
| 'Map 1' >> beam.Map(your_map1)
| 'Map 2' >> beam.Map(your_map2)
)
# Sink to Bigquery
(result_pcollection | 'Map 3' >> beam.Map(apply_transform_logic_bq)
| 'Write to BQ' >> beam.io.WriteToBigQuery(
project=project_id,
dataset=dataset,
table=table,
method='YOUR_WRITE_METHOD',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
)
# Sink to GCS
(result_pcollection | 'Map 4' >> beam.Map(apply_transform_logic_gcs)
| 'Windowing logic' >> WindowInto(FixedWindows(10*60))
| fileio.WriteToFiles(path=known_args.output)
)
To be able to write a streaming flow to GCS
, you need applying windowing and generate a file per window.
Upvotes: 2
Reputation: 1357
Yes, this is definitely possible. In Java, you can do something like this:
PCollection<PubsubMessage> messages = p.apply(PubsubIO.read()...);
// Write messages to GCS
messages.apply(TextIO.write()...);
// Write messages to BQ
messages.apply(BigQueryIO.write()...);
The messages will only be consumed once from pubsub. You can define multiple branches of your pipeline that all read from the same PCollection
.
The downside here is really around error handling. If your BigQuery sink has errors that cause the pipeline to fail, it will be taking down your GCS output as well. It's harder to reason about these failure scenarios when you have multiple sinks in one pipeline.
You mention "firstly in Google Cloud Storage and Secondly at Bigquery"; if the order of writes is important (you don't want data showing up in BQ if it isn't also in GCS), that's significantly more difficult to express, and you'd likely be better off creating a second pipeline that reads from the GCS output of the first pipeline and writes to BQ.
Upvotes: 1