foxwendy
foxwendy

Reputation: 2929

Strange issue when .NET InsertMediaUpload to Upload CSV to BigQuery

I'm using .NET Client API: IUploadProgress progress = insertMediaUpload.Upload() to upload a csv to BigQuery. Basically what I do is:

1. submit a uploading job,
2. get the job status (pending, running, done..)
3. if there are any errors given by BigQuery, then print them and throw exceptions for further handling.

The code below is not doing exactly what I want and I hope someone can help me to improve it.

Specifically, several strange code behavior happened:
1. even running the same code on the same CSV which was made intentionally for fail, BQ error messages parsed out in UploadOnResponseReceived() would be printed out in some calls but not in others. why?
2. IUploadProgress value seems to be relevant to UploadOnResponseReceived() behavior: if I do nothing in UploadOnResponseReceived, then progress.status will always be 'Completed', if UploadOnResponseReceived throws an exception, then progress.status will be failed.
3. when progress.status failed, there's no way to get the exception throwed from UploadOnResponseReceived. I actually do need to get the exception, how should I do?

 public bool ExecuteUploadJobToTable(string dataset, string tableId, string filePath, TableSchema schema, string createDisposition, char delimiter)
    {

        TableReference destTable = new TableReference { ProjectId = _account.ProjectId, DatasetId = dataset, TableId = tableId };

        JobConfigurationLoad configLoad = new JobConfigurationLoad 
        {
            Schema = schema,
            DestinationTable = destTable,
            Encoding = "ISO-8859-1",
            CreateDisposition = "CREATE_IF_NEEDED",
            WriteDisposition = createDisposition,
            FieldDelimiter = delimiter.ToString(),
            AllowJaggedRows = true,
            SourceFormat = "CSV"
        };

        JobConfiguration config = new JobConfiguration {Load = configLoad};

        Job job = new Job {Configuration = config};

        //set job reference (mainly job id)
        JobReference jobRef = new JobReference
        {
            JobId = GenerateJobID("Upload"),
            ProjectId = _account.ProjectId
        };
        job.JobReference = jobRef;

        bool isSuccess = true;
        using (var fileStream = new FileStream(filePath, FileMode.Open))
        {
            JobsResource.InsertMediaUpload insertMediaUpload = new JobsResource.InsertMediaUpload(BigQueryService, job, job.JobReference.ProjectId, stream: fileStream, contentType: "application/octet-stream");
            insertMediaUpload.ProgressChanged += UploadOnProgressChanged;
            insertMediaUpload.ResponseReceived += UploadOnResponseReceived;

            Console.WriteLine(string.Format("start {0}",jobRef.JobId));
            IUploadProgress progress = insertMediaUpload.Upload();
            if (progress.Status.ToString().Contains("Fail"))
            {
                isSuccess = false;
            }
        }
        Console.WriteLine(isSuccess);
        return isSuccess;
    }

    private void UploadOnProgressChanged(IUploadProgress process)
    {
        Console.WriteLine(process.Status + " " + process.BytesSent);
    }

    //thowring an exception will make IUploadProgress "Failed", otherwise, IUploadProgress will be "Completed"
    private void UploadOnResponseReceived(Job job)
    {
        try
        {
            job = PollUntilJobDone(job.JobReference, 5);
        }
        catch(Exception e)
        {
            Console.WriteLine("Unexcepted unretryable exception happens when poll job status");
            throw new BigQueryException("Unexcepted unretryable exception happens when poll job status",e);
        }

        StringBuilder errorMessageBuilder = new StringBuilder();
        ErrorProto fatalError = job.Status.ErrorResult;
        IList<ErrorProto> errors = job.Status.Errors;
        if (fatalError != null)
        {
            errorMessageBuilder.AppendLine("Job failed while writing to Bigquery. " + fatalError.Reason + ": " + fatalError.Message +
                      " at " + fatalError.Location);
        }
        if (errors != null)
        {
            foreach (ErrorProto error in errors)
            {
                errorMessageBuilder.AppendLine("Error: [REASON] " + error.Reason + " [MESSAGE] " + error.Message +
                                               " [LOCATION] " + error.Location);
            }

        }
        if (errorMessageBuilder.Length>0)//fatalError != null || errors != null  
        {
            Console.WriteLine(errorMessageBuilder.ToString());
            throw new BigQueryException(errorMessageBuilder.ToString());
        }
        Console.WriteLine("upload should be successful");
    }

    private Job PollUntilJobDone(JobReference jobReference, int pauseSeconds)
    {
        int backoff = 1000;//backoff starts from 1 sec + random

        for(int i = 0; i < 10; i++)
        {
            try
            {
                var pollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();
                Console.WriteLine(jobReference.JobId + ": " + pollJob.Status.State);
                if (pollJob.Status.State.Equals("DONE"))
                {
                    return pollJob;
                }
                // Pause execution for pauseSeconds before polling job status again,
                // to reduce unnecessary calls to the BigQuery API and lower overall
                // application bandwidth.
                Thread.Sleep(pauseSeconds * 1000);
            }
            catch (Exception e)
            {
                BigQueryException exception = new BigQueryException(e.Message,e);
                if (exception.IsTemporary)
                {
                    int sleep = backoff + Random.Next(1000);
                    Console.WriteLine("pollUntilJobDone job execute failed. Sleeping {0} ms before retry", sleep);
                    Thread.Sleep(sleep);
                }
                else
                {
                    throw;
                }
            }
            backoff *= 2;
        }
        return null;
    }

Upvotes: 2

Views: 294

Answers (2)

akuma6099
akuma6099

Reputation: 131

I spent weeks trying to figure out how to obtain upload progress using the .NET cloud library as there are no events exposed with BigQueryClient. Your code pieced together what I needed and I wanted to return the favor even though it has been 9+ years.

There are 2 stages when dealing with BQ. First is the job upload, second is job status. You have to split things up. I have a need for uploading large file sizes of 700MB+ and using the simple BigQueryClient.UploadJSON() method will sit there until the upload has finished. Not optimal. I upload the file and obtain progress status. Once that is complete, then you poll the job yourself every X seconds and deal with the results. Both processes have their own exceptions to deal with. Here's my entire function for the next person.

Imports Google.Api.Gax
Imports Google.Apis.Bigquery.v2
Imports Google.Cloud.BigQuery.V2
Imports System.Reflection

Public Function UploadJSON(ByVal datasetId As String, ByVal tableId As String, ByRef memStream As System.IO.MemoryStream) As Boolean

    'Set default dataset
    If String.IsNullOrEmpty(datasetId) Then datasetId = "my_dataset_id_in_bq"

    'Validation
    If Not IsNothing(memStream) AndAlso memStream.Length() > 0 AndAlso Not String.IsNullOrEmpty(tableId) Then

        'Create BQ client base classes
        Dim destTable As Data.TableReference = New Data.TableReference With {.ProjectId = crypt.DecryptData(BQCredential.ProjectId), .DatasetId = datasetId, .TableId = tableId}
        Dim configLoad As Data.JobConfigurationLoad = New Data.JobConfigurationLoad With {.Schema = GetSchemaFromObject(New CompletedContact()), .DestinationTable = destTable, .Encoding = "UTF-8", .SourceFormat = "NEWLINE_DELIMITED_JSON"}
        Dim config As Data.JobConfiguration = New Data.JobConfiguration With {.Load = configLoad}
        Dim job As Data.Job = New Data.Job With {.Configuration = config}


        Dim JobId As String = $"job_{System.Guid.NewGuid().ToString()}"
        Dim jobRef As Data.JobReference = New Data.JobReference With {.JobId = JobId, .ProjectId = crypt.DecryptData(BQCredential.ProjectId)}
        job.JobReference = jobRef

        'Create BQ client
        Dim BQClient As BigQueryClient
        If BQCredential.ServiceAccount.Length > 0 Then
            Dim clientBuilder As New BigQueryClientBuilder()
            clientBuilder.ProjectId = crypt.DecryptData(BQCredential.ProjectId)
            clientBuilder.JsonCredentials = crypt.DecryptData(BQCredential.ServiceAccount)
            BQClient = clientBuilder.Build()
        Else
            BQClient = BigQueryClient.Create(crypt.DecryptData(BQCredential.ProjectId))
        End If

        'Store byte size for math later
        totalStreamSize = memStream.Length()
        memStream.Seek(0, IO.SeekOrigin.Begin)

        'Create an InsertMediaUpload in order to hook into progress changed event during base service upload.
        Try
            Dim insertMediaUpload As JobsResource.InsertMediaUpload = New JobsResource.InsertMediaUpload(BQClient.Service, job, job.JobReference.ProjectId, memStream, contentType:="application/octet-stream")

            AddHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged
            'AddHandler insertMediaUpload.ResponseReceived, AddressOf UploadOnResponseReceived

            MessageEvents.Raise(EvtMsgType.INFO, $"Creating BigQuery Job: {jobRef.JobId}, Uploading data...")

            'Start elapsed timer to calculate duration and Est. completion time
            elapsedTime = DateTime.Now()
            elapsedTimer.Enabled = True
            elapsedTimer.Start()

            Dim progress As Google.Apis.Upload.IUploadProgress = insertMediaUpload.Upload()

            elapsedTimer.Stop()
            elapsedTimer.Enabled = False

            RemoveHandler insertMediaUpload.ProgressChanged, AddressOf UploadOnProgressChanged

        Catch ex As Exception
            MessageEvents.Raise(EvtMsgType.INFO, $"Error creating BigQuery Job: {ex.Message}")
            Return False
        End Try


        Dim results As BigQueryJob = BQClient.GetJob(job.JobReference)
        MessageEvents.Raise(EvtMsgType.INFO, $"Upload complete. Polling Job Id: {results.Resource.JobReference.JobId}")

        Dim BQTimeoutCounter As Integer = 0
        Dim firstPass As Boolean = False
        Dim sleepTime As Integer = 5

        Do While results.Status.State <> "DONE" And BQTimeoutCounter <> (60 / sleepTime) '60 seconds
            Try
                BQTimeoutCounter += 1
                System.Threading.Thread.Sleep(5000)

                results = BQClient.GetJob(results.Resource.JobReference)

                If results.State = JobState.Running AndAlso firstPass = False Then
                    Dim tmpOffset As DateTimeOffset = DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime)
                    MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId}, Started: {tmpOffset.LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}, Job Status: {results.Status.State}")
                    firstPass = True
                End If
            Catch ex As Exception
                MessageEvents.Raise(EvtMsgType.INFO, $"Error: {ex.Message}")
            End Try
        Loop

        Select Case results.State
            Case JobState.Done
                MessageEvents.Raise(EvtMsgType.INFO, $"Job: {results.Resource.JobReference.JobId} finished")
                MessageEvents.Raise(EvtMsgType.INFO, $"Summary:")

                If Not IsNothing(results.Status.Errors) Then
                    For Each Err As Data.ErrorProto In results.Status.Errors
                        MessageEvents.Raise(EvtMsgType.ERR, $"Reason: {Err.Reason}, Message: {Err.Message}")
                    Next
                End If

                If Not IsNothing(results.Statistics.Load) Then
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Input File Count: {results.Statistics.Load.InputFiles}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Total Input Size: {BytesToString(results.Statistics.Load.InputFileBytes)}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Output Row Count: {results.Statistics.Load.OutputRows}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Total Output Size: {BytesToString(results.Statistics.Load.OutputBytes)}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Bad Record Count: {results.Statistics.Load.BadRecords}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Start Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.StartTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   End Time: {DateTimeOffset.FromUnixTimeMilliseconds(results.Statistics.EndTime).LocalDateTime().ToString("MM/dd/yyyy hh:mm:ss tt")}")
                    MessageEvents.Raise(EvtMsgType.NONE, $"   Total Execution Time: {TimeSpan.FromMilliseconds(results.Statistics.FinalExecutionDurationMs).Duration()}")
                End If

            Case JobState.Pending
                MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout. Job is in pending state")
            Case JobState.Running
                MessageEvents.Raise(EvtMsgType.INFO, $"Exiting job monitor due to > 60 second timeout")
        End Select

        BQClient.Dispose()
        Return True

    End If

    Return False

End Function

Public Function UploadOnProgressChanged(ByVal process As Google.Apis.Upload.IUploadProgress)
    'Avoid divide by zero
    Try
        Dim currentPercent As Double = 0
        If process.BytesSent <> 0 Then currentPercent = (process.BytesSent / totalStreamSize)

        Dim timeElapsed As TimeSpan = DateTime.Now.Subtract(elapsedTime)
        Dim remainingTime As TimeSpan
        If timeElapsed.TotalSeconds <> 0 AndAlso currentPercent <> 0 Then
            remainingTime = TimeSpan.FromSeconds(CDbl((timeElapsed.TotalSeconds / currentPercent) - timeElapsed.TotalSeconds))
        Else
            remainingTime = TimeSpan.Zero
        End If

        Dim BytesSent As String = process.BytesSent
        If BytesSent <> 0 Then BytesSent = BytesToString(process.BytesSent)

        MessageEvents.Raise(EvtMsgType.INFO, $"Status: {process.Status}, Sent: {BytesSent}/{BytesToString(totalStreamSize)}, Progress: {Math.Floor(currentPercent * 100)}%, Elapsed Time: {timeElapsed.ToString("hh\:mm\:ss")}, Est. Completion: {remainingTime.ToString("hh\:mm\:ss")}")

    Catch ex As Exception
        MessageEvents.Raise(EvtMsgType.ERR, $"{ex.Message}")
    End Try

End Function

Private Shared Function BytesToString(ByVal byteCount As Long) As String
    Dim suf As String() = {"B", "KB", "MB", "GB", "TB", "PB", "EB"}
    If byteCount = 0 Then Return "0" & suf(0)
    Dim bytes As Long = Math.Abs(byteCount)
    Dim place As Integer = Convert.ToInt32(Math.Floor(Math.Log(bytes, 1024)))
    Dim num As Double = Math.Round(bytes / Math.Pow(1024, place), 1)
    Return (Math.Sign(byteCount) * num).ToString() & suf(place)
End Function

Upvotes: 0

Michael Sheldon
Michael Sheldon

Reputation: 2057

Regarding your "how do I catch the exception" question, it seems like the callbacks occur asynchronously on another thread. If you throw an exception, it's going to be caught by whatever framework is calling the callback.

Searching around for similar questions I found these answers that might help you: Catching an exception thrown in an asynchronous callback, and this one shows how to update UI in another thread from the upload progress received in the background thread: Tracking upload progress of WebClient

Upvotes: 2

Related Questions