BusyBee
BusyBee

Reputation: 95

How to iterate Big Query TableResult correctly?

I have a complex join query in Big Query and need to run in a spark job. This is the current code:

val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
      .setCredentials(credentials)
      .build().getService

val query =
      //some complex query

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .build()

val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>

//create case class from the row
}

It keeps running into this error:

Exceeded rate limits: Your project: XXX exceeded quota for tabledata.list bytes per second per project.

Is there a way to better iterate through the results? I have tried to do setPriority(QueryJobConfiguration.Priority.BATCH) on the query job configuration, but it doesn't improve results. Also tried to reduce the number of spark executors to 1, but of no use.

Upvotes: 0

Views: 1946

Answers (2)

BusyBee
BusyBee

Reputation: 95

We resolved the situation by providing a custom page size on the TableResult

Upvotes: 0

David Rabinowitz
David Rabinowitz

Reputation: 30448

Instead of reading the query results directly, you can use the spark-bigquery-connector to read them into a DataFrame:

val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder(
        query)
        .setUseLegacySql(false)
        .setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
        .setDestinationTable(TableId.of(destinationDataset, destinationTable))
        .build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()

val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()

val result = queryJob.getQueryResults()

// read into DataFrame
val data = spark.read.format("bigquery")
  .option("dataset",destinationDataset)
  .option("table" destinationTable)
  .load()

Upvotes: 2

Related Questions