Reputation: 2929
I'm using .NET Client API: IUploadProgress progress = insertMediaUpload.Upload() to upload a csv to BigQuery. Basically what I do is:
1. submit a uploading job,
2. get the job status (pending, running, done..)
3. if there are any errors given by BigQuery, then print them and throw exceptions for further handling.
The code below is not doing exactly what I want and I hope someone can help me to improve it.
Specifically, several strange code behavior happened:
1. even running the same code on the same CSV which was made intentionally for fail, BQ error messages parsed out in UploadOnResponseReceived() would be printed out in some calls but not in others. why?
2. IUploadProgress value seems to be relevant to UploadOnResponseReceived() behavior: if I do nothing in UploadOnResponseReceived, then progress.status will always be 'Completed', if UploadOnResponseReceived throws an exception, then progress.status will be failed.
3. when progress.status failed, there's no way to get the exception throwed from UploadOnResponseReceived. I actually do need to get the exception, how should I do?
public bool ExecuteUploadJobToTable(string dataset, string tableId, string filePath, TableSchema schema, string createDisposition, char delimiter)
{
TableReference destTable = new TableReference { ProjectId = _account.ProjectId, DatasetId = dataset, TableId = tableId };
JobConfigurationLoad configLoad = new JobConfigurationLoad
{
Schema = schema,
DestinationTable = destTable,
Encoding = "ISO-8859-1",
CreateDisposition = "CREATE_IF_NEEDED",
WriteDisposition = createDisposition,
FieldDelimiter = delimiter.ToString(),
AllowJaggedRows = true,
SourceFormat = "CSV"
};
JobConfiguration config = new JobConfiguration {Load = configLoad};
Job job = new Job {Configuration = config};
//set job reference (mainly job id)
JobReference jobRef = new JobReference
{
JobId = GenerateJobID("Upload"),
ProjectId = _account.ProjectId
};
job.JobReference = jobRef;
bool isSuccess = true;
using (var fileStream = new FileStream(filePath, FileMode.Open))
{
JobsResource.InsertMediaUpload insertMediaUpload = new JobsResource.InsertMediaUpload(BigQueryService, job, job.JobReference.ProjectId, stream: fileStream, contentType: "application/octet-stream");
insertMediaUpload.ProgressChanged += UploadOnProgressChanged;
insertMediaUpload.ResponseReceived += UploadOnResponseReceived;
Console.WriteLine(string.Format("start {0}",jobRef.JobId));
IUploadProgress progress = insertMediaUpload.Upload();
if (progress.Status.ToString().Contains("Fail"))
{
isSuccess = false;
}
}
Console.WriteLine(isSuccess);
return isSuccess;
}
private void UploadOnProgressChanged(IUploadProgress process)
{
Console.WriteLine(process.Status + " " + process.BytesSent);
}
//thowring an exception will make IUploadProgress "Failed", otherwise, IUploadProgress will be "Completed"
private void UploadOnResponseReceived(Job job)
{
try
{
job = PollUntilJobDone(job.JobReference, 5);
}
catch(Exception e)
{
Console.WriteLine("Unexcepted unretryable exception happens when poll job status");
throw new BigQueryException("Unexcepted unretryable exception happens when poll job status",e);
}
StringBuilder errorMessageBuilder = new StringBuilder();
ErrorProto fatalError = job.Status.ErrorResult;
IList<ErrorProto> errors = job.Status.Errors;
if (fatalError != null)
{
errorMessageBuilder.AppendLine("Job failed while writing to Bigquery. " + fatalError.Reason + ": " + fatalError.Message +
" at " + fatalError.Location);
}
if (errors != null)
{
foreach (ErrorProto error in errors)
{
errorMessageBuilder.AppendLine("Error: [REASON] " + error.Reason + " [MESSAGE] " + error.Message +
" [LOCATION] " + error.Location);
}
}
if (errorMessageBuilder.Length>0)//fatalError != null || errors != null
{
Console.WriteLine(errorMessageBuilder.ToString());
throw new BigQueryException(errorMessageBuilder.ToString());
}
Console.WriteLine("upload should be successful");
}
private Job PollUntilJobDone(JobReference jobReference, int pauseSeconds)
{
int backoff = 1000;//backoff starts from 1 sec + random
for(int i = 0; i < 10; i++)
{
try
{
var pollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();
Console.WriteLine(jobReference.JobId + ": " + pollJob.Status.State);
if (pollJob.Status.State.Equals("DONE"))
{
return pollJob;
}
// Pause execution for pauseSeconds before polling job status again,
// to reduce unnecessary calls to the BigQuery API and lower overall
// application bandwidth.
Thread.Sleep(pauseSeconds * 1000);
}
catch (Exception e)
{
BigQueryException exception = new BigQueryException(e.Message,e);
if (exception.IsTemporary)
{
int sleep = backoff + Random.Next(1000);
Console.WriteLine("pollUntilJobDone job execute failed. Sleeping {0} ms before retry", sleep);
Thread.Sleep(sleep);
}
else
{
throw;
}
}
backoff *= 2;
}
return null;
}
Upvotes: 2
Views: 294
Reputation: 131
I spent weeks trying to figure out how to obtain upload progress using the .NET cloud library as there are no events exposed with BigQueryClient. Your code pieced together what I needed and I wanted to return the favor even though it has been 9+ years.
There are 2 stages when dealing with BQ. First is the job upload, second is job status. You have to split things up. I have a need for uploading large file sizes of 700MB+ and using the simple BigQueryClient.UploadJSON() method will sit there until the upload has finished. Not optimal. I upload the file and obtain progress status. Once that is complete, then you poll the job yourself every X seconds and deal with the results. Both processes have their own exceptions to deal with. Here's my entire function for the next person.
Imports Google.Api.Gax
Imports Google.Apis.Bigquery.v2
Imports Google.Cloud.BigQuery.V2
Imports System.Reflection
Public Function UploadJSON(ByVal datasetId As String, ByVal tableId As String, ByRef memStream As System.IO.MemoryStream) As Boolean
'Set default dataset
If String.IsNullOrEmpty(datasetId) Then datasetId = "my_dataset_id_in_bq"
'Validation
If Not IsNothing(memStream) AndAlso memStream.Length() > 0 AndAlso Not String.IsNullOrEmpty(tableId) Then
'Create BQ client base classes
Dim destTable As Data.TableReference = New Data.TableReference With {.ProjectId = crypt.DecryptData(BQCredential.ProjectId), .DatasetId = datasetId, .TableId = tableId}
Dim configLoad As Data.JobConfigurationLoad = New Data.JobConfigurationLoad With {.Schema = GetSchemaFromObject(New CompletedContact()), .DestinationTable = destTable, .Encoding = "UTF-8", .SourceFormat = "NEWLINE_DELIMITED_JSON"}
Dim config As Data.JobConfiguration = New Data.JobConfiguration With {.Load = configLoad}
Dim job As Data.Job = New Data.Job With {.Configuration = config}
Dim JobId As String = $"job_{System.Guid.NewGuid().ToString()}"
Dim jobRef As Data.JobReference = New Data.JobReference With {.JobId = JobId, .ProjectId = crypt.DecryptData(BQCredential.ProjectId)}
job.JobReference = jobRef
'Create BQ client
Dim BQClient As BigQueryClient
If BQCredential.ServiceAccount.Length > 0 Then
Dim clientBuilder As New BigQueryClientBuilder()
clientBuilder.ProjectId = crypt.DecryptData(BQCredential.ProjectId)
clientBuilder.JsonCredentials = crypt.DecryptData(BQCredential.ServiceAccount)
BQClient = clientBuilder.Build()
Else
BQClient = BigQueryClient.Create(crypt.DecryptData(BQCredential.ProjectId))
End If
'Store byte size for math later
totalStreamSize = memStream.Length()
memStream.Seek(0, IO.SeekOrigin.Begin)
'Create an InsertMediaUpload in order to hook into progress changed event during base service upload.
Try
Dim insertMediaUpload As JobsResource.InsertMediaUpload = New JobsResource.InsertMediaUpload(BQClient.Service, job, job.JobReference.ProjectId, memStream, contentType:="application/octet-stream")
AddHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged
'AddHandler insertMediaUpload.ResponseReceived, AddressOf UploadOnResponseReceived
MessageEvents.Raise(EvtMsgType.INFO, $"Creating BigQuery Job: {jobRef.JobId}, Uploading data...")
'Start elapsed timer to calculate duration and Est. completion time
elapsedTime = DateTime.Now()
elapsedTimer.Enabled = True
elapsedTimer.Start()
Dim progress As Google.Apis.Upload.IUploadProgress = insertMediaUpload.Upload()
elapsedTimer.Stop()
elapsedTimer.Enabled = False
RemoveHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged
Catch ex As Exception
MessageEvents.Raise(EvtMsgType.INFO, $"Error creating BigQuery Job: {ex.Message}")
Return False
End Try
Dim results As BigQueryJob = BQClient.GetJob(job.JobReference)
MessageEvents.Raise(EvtMsgType.INFO, $"Upload complete. Polling Job Id: {results.Resource.JobReference.JobId}")
Dim BQTimeoutCounter As Integer = 0
Dim firstPass As Boolean = False
Dim sleepTime As Integer = 5
Do While results.Status.State <> "DONE" And BQTimeoutCounter <> (60 / sleepTime) '60 seconds
Try
BQTimeoutCounter += 1
System.Threading.Thread.Sleep(5000)
results = BQClient.GetJob(results.Resource.JobReference)
If results.State = JobState.Running AndAlso firstPass = False Then
Dim tmpOffset As DateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime)
MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId}, Started: {tmpOffset.LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}, Job Status: {results.Status.State}")
firstPass = True
End If
Catch ex As Exception
MessageEvents.Raise(EvtMsgType.INFO, $"Error: {ex.Message}")
End Try
Loop
Select Case results.State
Case JobState.Done
MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId} finished")
MessageEvents.Raise(EvtMsgType.INFO, $"Summary:")
If Not IsNothing(results.Status.Errors) Then
For Each Err As Data.ErrorProto In results.Status.Errors
MessageEvents.Raise(EvtMsgType.ERR, $"Reason: {Err.Reason}, Message: {Err.Message}")
Next
End If
If Not IsNothing(results.Statistics.Load) Then
MessageEvents.Raise(EvtMsgType.NONE, $" Input File Count: {results.Statistics.Load.InputFiles}")
MessageEvents.Raise(EvtMsgType.NONE, $" Total Input Size: {BytesToString(results.Statistics.Load.InputFileBytes)}")
MessageEvents.Raise(EvtMsgType.NONE, $" Output Row Count: {results.Statistics.Load.OutputRows}")
MessageEvents.Raise(EvtMsgType.NONE, $" Total Output Size: {BytesToString(results.Statistics.Load.OutputBytes)}")
MessageEvents.Raise(EvtMsgType.NONE, $" Bad Record Count: {results.Statistics.Load.BadRecords}")
MessageEvents.Raise(EvtMsgType.NONE, $" Start Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
MessageEvents.Raise(EvtMsgType.NONE, $" End Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.EndTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
MessageEvents.Raise(EvtMsgType.NONE, $" Total Execution Time: {TimeSpan.FromMilliseconds(results.Statistics.FinalExecutionDurationMs).Duration()}")
End If
Case JobState.Pending
MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout. Job is in pending state")
Case JobState.Running
MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout")
End Select
BQClient.Dispose()
Return True
End If
Return False
End Function
Public Function UploadOnProgressChanged(ByVal process As Google.Apis.Upload.IUploadProgress)
'Avoid divide by zero
Try
Dim currentPercent As Double = 0
If process.BytesSent <> 0 Then currentPercent = (process.BytesSent / totalStreamSize)
Dim timeElapsed As TimeSpan = DateTime.Now.Subtract(elapsedTime)
Dim remainingTime As TimeSpan
If timeElapsed.TotalSeconds <> 0 AndAlso currentPercent <> 0 Then
remainingTime = TimeSpan.FromSeconds(CDbl((timeElapsed.TotalSeconds / currentPercent) - timeElapsed.TotalSeconds))
Else
remainingTime = TimeSpan.Zero
End If
Dim BytesSent As String = process.BytesSent
If BytesSent <> 0 Then BytesSent = BytesToString(process.BytesSent)
MessageEvents.Raise(EvtMsgType.INFO, $"Status: {process.Status}, Sent: {BytesSent}/{BytesToString(totalStreamSize)}, Progress: {Math.Floor(currentPercent * 100)}%, Elapsed Time: {timeElapsed.ToString("hh\:mm\:ss")}, Est. Completion: {remainingTime.ToString("hh\:mm\:ss")}")
Catch ex As Exception
MessageEvents.Raise(EvtMsgType.ERR, $"{ex.Message}")
End Try
End Function
Private Shared Function BytesToString(ByVal byteCount As Long) As String
Dim suf As String() = {"B", "KB", "MB", "GB", "TB", "PB", "EB"}
If byteCount = 0 Then Return "0" & suf(0)
Dim bytes As Long = Math.Abs(byteCount)
Dim place As Integer = Convert.ToInt32(Math.Floor(Math.Log(bytes, 1024)))
Dim num As Double = Math.Round(bytes / Math.Pow(1024, place), 1)
Return (Math.Sign(byteCount) * num).ToString() & suf(place)
End Function
Upvotes: 0
Reputation: 2057
Regarding your "how do I catch the exception" question, it seems like the callbacks occur asynchronously on another thread. If you throw an exception, it's going to be caught by whatever framework is calling the callback.
Searching around for similar questions I found these answers that might help you: Catching an exception thrown in an asynchronous callback, and this one shows how to update UI in another thread from the upload progress received in the background thread: Tracking upload progress of WebClient
Upvotes: 2