SarfarazSoomro
SarfarazSoomro

Reputation: 413

Sequential Execution in Apache Beam - Java SDK 2.18.0

Hi I have a couple of queries I want to run & save results in sequence one after another using Apache Beam, I've seen some similar questions but couldn't find an answer. I'm used to designing pipelines using Airflow and I'm fairly new to Apache Beam. I'm using the Dataflow runner. Here's my code right now: I would like query2 to run only after query1 results are saved to the corresponding table. How do I chain them?

    PCollection<TableRow> resultsStep1 = getData("Run Query 1",
            "Select * FROM basetable");

    resultsStep1.apply("Save Query1 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
                    .to("resultsStep1")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    PCollection<TableRow> resultsStep2 = getData("Run Query 2",
            "Select * FROM resultsStep1");

    resultsStep2.apply("Save Query2 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
                    .to("resultsStep2")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

And here's my getData function definition:

private PCollection<TableRow> getData(final String taskName, final String query) {
    return pipeline.apply(taskName,
            BigQueryIO.readTableRowsWithSchema()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withCoder(TableRowJsonCoder.of()));
}

Edit (Update): Turns out: You can’t sequence the completion of a BigQuery write with other steps of your pipeline.

Which I think is a big limitation for designing pipelines. Source: https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations

Upvotes: 1

Views: 2425

Answers (1)

Jayadeep Jayaraman
Jayadeep Jayaraman

Reputation: 2825

You can use the Wait method to do this. A contrived example is below

 PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
 data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

You can find more details in the API documentation present here - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org/apache/beam/sdk/transforms/Wait.html

Upvotes: 6

Related Questions