Mihir Sharma
Mihir Sharma

Reputation: 132

Is it possible to write a single Pcollection at different Output sinks without using side inputs?

I have a specific use case for writing my pipeline data. I wanted to make a single Pub/Sub Subscription and wanted to read those from that single source and write the Pcollection at multiple sinks without making another Pub/Sub subscription for it. I've been wanting to make a Pipeline such that I've multiple pipelines in a single dataflow working in parallel and write the same pipeline data, firstly in Google Cloud Storage and Secondly at Bigquery by just using a single subscription. Code or references for the same would be helpful and bring light to the direction I'm working in.

Thanks in advance!!

Upvotes: 2

Views: 373

Answers (2)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

You only have to do multi sinks in your Beam job to meet your need.

In Beam you can build a PCollection and then sink this PCollection to multiple places :

Example with Beam Python :

result_pcollection = (inputs | 'Read from pub sub' >> ReadFromPubSub(
                    subscription=subscription_path) 
        | 'Map 1' >> beam.Map(your_map1) 
        | 'Map 2' >> beam.Map(your_map2)
      )

# Sink to Bigquery
(result_pcollection | 'Map 3' >> beam.Map(apply_transform_logic_bq)
                    | 'Write to BQ' >> beam.io.WriteToBigQuery(
                    project=project_id,
                    dataset=dataset,
                    table=table,
                    method='YOUR_WRITE_METHOD',
                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
)

# Sink to GCS
(result_pcollection | 'Map 4' >> beam.Map(apply_transform_logic_gcs)
                    | 'Windowing logic' >> WindowInto(FixedWindows(10*60))
                    |  fileio.WriteToFiles(path=known_args.output)
)

To be able to write a streaming flow to GCS, you need applying windowing and generate a file per window.

Upvotes: 2

Jeff Klukas
Jeff Klukas

Reputation: 1357

Yes, this is definitely possible. In Java, you can do something like this:

PCollection<PubsubMessage> messages = p.apply(PubsubIO.read()...);

// Write messages to GCS
messages.apply(TextIO.write()...);

// Write messages to BQ
messages.apply(BigQueryIO.write()...);

The messages will only be consumed once from pubsub. You can define multiple branches of your pipeline that all read from the same PCollection.

The downside here is really around error handling. If your BigQuery sink has errors that cause the pipeline to fail, it will be taking down your GCS output as well. It's harder to reason about these failure scenarios when you have multiple sinks in one pipeline.

You mention "firstly in Google Cloud Storage and Secondly at Bigquery"; if the order of writes is important (you don't want data showing up in BQ if it isn't also in GCS), that's significantly more difficult to express, and you'd likely be better off creating a second pipeline that reads from the GCS output of the first pipeline and writes to BQ.

Upvotes: 1

Related Questions