Reputation: 11
I'm writing a Spark job in Scala on Google DataProc that executes daily and processes records each marked with a transaction time. The records are grouped by year-month combo and each group is written to a separate monthly parquet file in GCS (e.g. 2018-07-file.parquet
, 2018-08-file.parquet
, etc). Note that these files go back about 5 years and form a very large dataset (~1TB).
I want to write these files into BigQuery and have the job update only the monthly records that have changed in the current run. For simplicity, I'd like to delete the existing records for any month with updated ones and then just load in the data from the monthly parquet file.
I am trying to use the BigQuery Connector for DataProc but it only seems to support updating of an entire table and not a batch of records filtered by a date field, for example.
What is the best way to do this? I tried including the full BigQuery library JAR into my project and using a data manipulation query to delete existing monthly records as shown below:
def writeDataset(sparkContext: SparkContext, monthYear: String, ds: Dataset[TargetOrder]) = {
val dtMonthYear = FeedWriter.parquetDateFormat.parse(monthYear)
val bigquery: BigQuery = BigQueryOptions.getDefaultInstance.getService
val queryConfig: QueryJobConfiguration =
QueryJobConfiguration.newBuilder("DELETE FROM `" + getBQTableName(monthYear) + "` " +
"WHERE header.trans_time BETWEEN PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + monthYear + "') " +
"AND PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + DateUtils.addMonths(dtMonthYear, 1) + "') ")
.setUseLegacySql(false)
.build();
val jobId: JobId = JobId.of(UUID.randomUUID().toString());
val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()).waitFor()
}
but I get the following error (I assume including the full BQ client JAR in a DataProc job is not allowed or perhaps it just doesn't play nice with the BQ connector):
java.lang.NoSuchMethodError: com.google.api.services.bigquery.model.JobReference.setLocation(Ljava/lang/String;)Lcom/google/api/services/bigquery/model/JobReference;
at com.google.cloud.bigquery.JobId.toPb(JobId.java:114)
at com.google.cloud.bigquery.JobInfo.toPb(JobInfo.java:370)
at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:198)
at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:187)
at ca.mycompany.myproject.output.BigQueryWriter$.writeDataset(BigQueryWriter.scala:39)
Upvotes: 0
Views: 2873
Reputation: 11
I found that including the full client JARs in a DataProc job does not seem to work (hence why they created separate connector extensions for BQ and other services) so instead, I ended up having my Dataproc job submit a message to a Pub/Sub queue indicating which monthly parquet file was updated. I then created a Cloud Function to monitor the pub/sub queue and spawn a BigQuery job to load in only the changed, monthly files.
I was able to just delete the monthly records from the BQ table by using a table partition (e.g. MyTable$20180101) and grouping all monthly records into the same day (currently, BQ only supports partitioning a table by DAY and not by month, so I had to create a separate field for each record that was set to 2018-01-01 for all records in 2018-01-xx, for example).
Sample of the Scala code in Dataproc to write to the Pub/Sub queue:
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}
import ca.my.company.config.ConfigOptions
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.pubsub.Pubsub
import com.google.api.services.pubsub.model.{PublishRequest, PubsubMessage}
import com.google.cloud.hadoop.util.RetryHttpInitializer
import org.apache.spark.streaming.pubsub.SparkGCPCredentials
import scala.collection.mutable
case class MyPubSubMessage (jobId: UUID, processedDate: Date, fileDate: Date, updatedFilePath: String)
object PubSubWriter {
private val PUBSUB_APP_NAME: String = "MyPubSubWriter"
private val messages: mutable.ListBuffer[PubsubMessage] = mutable.ListBuffer()
private val publishRequest = new PublishRequest()
private lazy val projectId: String = ConfigOptions().pubsubConfig.projectId
private lazy val topicId: String = ConfigOptions().pubsubConfig.topicId
private lazy val client = new Pubsub.Builder(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
new RetryHttpInitializer(
SparkGCPCredentials.builder.build().provider,
PUBSUB_APP_NAME
))
.setApplicationName(PUBSUB_APP_NAME)
.build()
def queueMessage(message: TlogPubSubMessage) {
if (message == null) return
val targetFileDateFormat = new SimpleDateFormat("yyyyMMdd")
val isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
isoDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
import scala.collection.JavaConversions._
val pubSubMessage = new PubsubMessage()
.setAttributes(Map("msgType" -> "t-log-notification", "jobId" -> message.jobId.toString, "processedDate" -> isoDateFormat.format(message.processedDate), "fileDate" -> targetFileDateFormat.format(message.fileDate)))
messages.synchronized {
messages.append(pubSubMessage.encodeData(message.updatedFilePath.getBytes))
}
}
def publishMessages(): Unit = {
import scala.collection.JavaConversions._
publishRequest.setMessages(messages)
client.projects().topics()
.publish(s"projects/$projectId/topics/$topicId", publishRequest)
.execute()
println(s"Update notifications: successfully sent ${messages.length} message(s) for topic '${topicId}' to Pub/Sub")
}
}
Sample of my Python cloud function to consume from the queue and spawn the BQ load job:
def update_bigquery(data, context):
import base64
from datetime import datetime
from dateutil import parser
from google.cloud import bigquery
from google.cloud.bigquery.table import TimePartitioning
from google.api_core.exceptions import GoogleAPICallError
dataset_id = 'mydatasetname'
table_id_base = 'mytablename'
# The data field looks like this:
# {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'fileDate': '20171201',
# 'jobId': '69f6307e-28a1-40fc-bb6d-572c0bea9346', 'msgType': 't-log-notification',
# 'processedDate': '2018-09-08T02:51:54Z'}, 'data': 'Z3M6Ly9nY3MtbGRsLWRzLWRhdGE...=='}
# Retrieve file path (filter out SUCCESS file in the folder path) and build the partition name
attributes = data['attributes']
file_path = base64.b64decode(data['data']).decode('utf-8') + "/part*"
partition_name = attributes['fileDate']
table_partition = table_id_base + "$" + partition_name
# Instantiate BQ client
client = bigquery.Client()
# Get reference to dataset and table
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_partition)
try:
# This only deletes the table partition and not the entire table
client.delete_table(table_ref) # API request
print('Table {}:{} deleted.'.format(dataset_id, table_partition))
except GoogleAPICallError as e:
print('Error deleting table ' + table_partition + ": " + str(e))
# Create BigQuery loading job
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.time_partitioning = TimePartitioning(field='bigQueryPartition')
try :
load_job = client.load_table_from_uri(
file_path,
dataset_ref.table(table_partition),
job_config=job_config) # API request
print('Starting job {}'.format(load_job.job_id))
# This can be commented-out to allow the job to run purely asynchronously
# though if it fails, I'm not sure how I could be notified
# For now, I will set this function to the max timeout (9 mins) and see if the BQ load job can consistently complete in time
load_job.result() # Waits for table load to complete.
print('Job finished.')
except GoogleAPICallError as e:
print("Error running BQ load job: " + str(e))
raise e
return 'Success'
Upvotes: 1
Reputation: 41
how about bigquery4s?
Its a Scala wrapper to the BQ Java client..I had the same issue and it worked for me.
Upvotes: 0