Dmytro Pavlov
Dmytro Pavlov

Reputation: 51

How to run BigQueryIO.read().fromQuery with parameters

I need to run multiple queries from a single .SQL file but with different params I've tried something like this but it does not work as BigQueryIO.Read consumes only PBegin.

  public PCollection<KV<String, TestDitoDto>> expand(PCollection<QueryParamsBatch> input) {

    PCollection<KV<String, Section1Dto>> section1 = input.apply("Read Section1 from BQ",
                BigQueryIO
                        .readTableRows()
                        .fromQuery(ResourceRetriever.getResourceFile("query/test/section1.sql"))
                        .usingStandardSql()
                        .withoutValidation())
            .apply("Convert section1 to Dto", ParDo.of(new TableRowToSection1DtoFunction()));
  }

Are there any other ways to put params from existing PCollection inside my BigQueryIO.read() invocation?

Upvotes: 1

Views: 682

Answers (2)

Dmytro Pavlov
Dmytro Pavlov

Reputation: 51

I've come up with the following solution: not to use BigQueryIO but regular GCP library for accessing BigQuery, marking it as transient and initializing it each time in method with @Setup annotation, as it is not Serializable

public class DenormalizedCase1Fn extends DoFn<*> {

  private transient BigQuery bigQuery;

  @Setup
  public void initialize() {
    this.bigQuery = BigQueryOptions.newBuilder()
            .setProjectId(bqProjectId.get())
            .setLocation(LOCATION)

            .setRetrySettings(RetrySettings.newBuilder()
                    .setRpcTimeoutMultiplier(1.5)
                    .setInitialRpcTimeout(Duration.ofSeconds(5))
                    .setMaxRpcTimeout(Duration.ofSeconds(30))
                    .setMaxAttempts(3).build())
            .build().getService();
  }

  @ProcessElement 
  ...

Upvotes: 0

chamikara
chamikara

Reputation: 2054

Are different queries/parameters available in the pipeline construction time ? If so you could just create multiple read transforms and combine results, for example, using a Flatten transform.

Beam Java BigQuery source does not support reading a PCollection of queries currently. Python BQ source does though.

Upvotes: 1

Related Questions