Reputation: 413
Hi I have a couple of queries I want to run & save results in sequence one after another using Apache Beam, I've seen some similar questions but couldn't find an answer. I'm used to designing pipelines using Airflow and I'm fairly new to Apache Beam. I'm using the Dataflow runner. Here's my code right now: I would like query2 to run only after query1 results are saved to the corresponding table. How do I chain them?
PCollection<TableRow> resultsStep1 = getData("Run Query 1",
"Select * FROM basetable");
resultsStep1.apply("Save Query1 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
.to("resultsStep1")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
PCollection<TableRow> resultsStep2 = getData("Run Query 2",
"Select * FROM resultsStep1");
resultsStep2.apply("Save Query2 data",
BigQueryIO.writeTableRows()
.withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
.to("resultsStep2")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);
And here's my getData function definition:
private PCollection<TableRow> getData(final String taskName, final String query) {
return pipeline.apply(taskName,
BigQueryIO.readTableRowsWithSchema()
.fromQuery(query)
.usingStandardSql()
.withCoder(TableRowJsonCoder.of()));
}
Edit (Update): Turns out:
You can’t sequence the completion of a BigQuery write with other steps of your pipeline.
Which I think is a big limitation for designing pipelines. Source: https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations
Upvotes: 1
Views: 2425
Reputation: 2825
You can use the Wait
method to do this. A contrived example is below
PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
data.apply(Wait.on(firstWriteResults))
// Windows of this intermediate PCollection will be processed no earlier than when
// the respective window of firstWriteResults closes.
.apply(ParDo.of(...write to second database...));
You can find more details in the API documentation present here - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org/apache/beam/sdk/transforms/Wait.html
Upvotes: 6