Reputation: 95
I have a complex join query in Big Query and need to run in a spark job. This is the current code:
val bigquery = BigQueryOptions.newBuilder().setProjectId(bigQueryConfig.bigQueryProjectId)
.setCredentials(credentials)
.build().getService
val query =
//some complex query
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
val output = result.iterateAll().iterator().asScala.to[Seq].map { row: FieldValueList =>
//create case class from the row
}
It keeps running into this error:
Exceeded rate limits: Your project: XXX exceeded quota for tabledata.list bytes per second per project.
Is there a way to better iterate through the results? I have tried to do setPriority(QueryJobConfiguration.Priority.BATCH)
on the query job configuration, but it doesn't improve results. Also tried to reduce the number of spark executors to 1, but of no use.
Upvotes: 0
Views: 1946
Reputation: 95
We resolved the situation by providing a custom page size on the TableResult
Upvotes: 0
Reputation: 30448
Instead of reading the query results directly, you can use the spark-bigquery-connector to read them into a DataFrame:
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder(
query)
.setUseLegacySql(false)
.setPriority(QueryJobConfiguration.Priority.BATCH) //(tried with and without)
.setDestinationTable(TableId.of(destinationDataset, destinationTable))
.build()
val jobId: JobId = JobId.newBuilder().setRandomJob().build()
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build).waitFor()
val result = queryJob.getQueryResults()
// read into DataFrame
val data = spark.read.format("bigquery")
.option("dataset",destinationDataset)
.option("table" destinationTable)
.load()
Upvotes: 2