Blazerg
Blazerg

Reputation: 849

Golang code is not inserting on BigQuery's table after I have created it from code

I have a BigQuery table with this schema:

name    STRING  NULLABLE    
age     INTEGER NULLABLE    
amount  INTEGER NULLABLE

And I can succesfully insert on the table with this code:

ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
    log.Fatal(err)
}

u := client.Dataset(dataSetID).Table("test_user").Uploader()

savers := []*bigquery.StructSaver{
    {Struct: test{Name: "Sylvanas", Age: 23, Amount: 123}},
}

if err := u.Put(ctx, savers); err != nil {
    log.Fatal(err)
}
fmt.Printf("rows inserted!!")

This works fine because the table is already created on bigquery, what I want to do now is deleting the table if exist and creating it again from code:

type test struct {
    Name   string
    Age    int
    Amount int
}

if err := client.Dataset(dataSetID).Table("test_user").Delete(ctx); err != nil {
    log.Fatal(err)
}

fmt.Printf("table deleted")

t := client.Dataset(dataSetID).Table("test_user")

// Infer table schema from a Go type.
schema, err := bigquery.InferSchema(test{})

if err := t.Create(ctx,
    &bigquery.TableMetadata{
        Name:           "test_user",
        Schema:         schema,
    }); err != nil {
    log.Fatal(err)
}

fmt.Printf("table created with the test schema")

This is also working really nice because is deleting the table and creating it with the infered schema from my struct test.

The problem is coming when I try to do the above insert after the delete/create process. No error is thrown but it is not inserting data (and the insert works fine if I comment the delete/create part).

What am I doing wrong? Do I need to commit the create table transaction somehow in order to insert or maybe do I need to close the DDBB connection?

Upvotes: 1

Views: 1978

Answers (1)

Jofre
Jofre

Reputation: 3898

According to this old answer, it might take up to 2 min for a BigQuery streaming buffer to be properly attached to a deleted and immediately re-created table.

I have run some tests, and in my case it just took a few seconds until the table is availabe instead of the 2~5 min reported on other questions. The resulting code is quite different from yours, but the concepts should apply.

What I tried is, instead of directly inserting the rows, adding them on a buffered channel, and wait until you can verify that the current table is properly saving the values before start sending them.

I've used a quite simpler struct to run my tests (so it was easier to write the code):

type Row struct {
    ByteField []byte
}

I generated my rows the following way:

func generateRows(rows chan<- *Row) {
    for {
            randBytes := make([]byte, 100)
            _, _ = rand.Read(randBytes)
            rows <- &row{randBytes}
            time.Sleep(time.Millisecond * 500) // use whatever frequency you need to insert rows at
    }
}

Notice how I'm sending the rows to the channel. Instead of generating them, you just have to get them from your data source.

The next part is finding a way to check if the table is properly saving the rows. What I did was trying to insert one of the buffered rows into the table, recover that row, and verify if everything is OK. If the row is not properly returned, send it back to the buffer.

func unreadyTable(rows chan *row) bool {
    client, err := bigquery.NewClient(context.Background(), project)
    if err != nil {return true}

    r := <-rows // get a row to try to insert       
    uploader := client.Dataset(dataset).Table(table).Uploader()
    if err := uploader.Put(context.Background(), r); err != nil {rows <- r;return true}

    i, err := client.Query(fmt.Sprintf("select * from `%s.%s.%s`", project, dataset, table)).Read(context.Background())
    if err != nil {rows <- r; return true}
    var testRow []bigquery.Value
    if err := i.Next(&testRow); err != nil {rows <- r;return true}
    if reflect.DeepEqual(&row{testrow[0].([]byte)}, r) {return false} // there's probably a better way to check if it's equal
    rows <- r;return true
}

With a function like that, we only need to add for ; unreadyTable(rows); time.Sleep(time.Second) {} to block until it's safe to insert the rows.

Finally, we put everything together:

func main() {

    // initialize a channel where the rows will be sent
    rows := make(chan *row, 1000) // make it big enough to hold several minutes of rows

    // start generating rows to be inserted
    go generateRows(rows)

    // create the BigQuery client
    client, err := bigquery.NewClient(context.Background(), project)
    if err != nil {/* handle error */}

    // delete the previous table
    if err := client.Dataset(dataset).Table(table).Delete(context.Background()); err != nil {/* handle error */}

    // create the new table
    schema, err := bigquery.InferSchema(row{})
    if err != nil {/* handle error */}
    if err := client.Dataset(dataset).Table(table).Create(context.Background(), &bigquery.TableMetadata{Schema: schema}); err != nil {/* handle error */}

    // wait for the table to be ready
    for ; unreadyTable(rows); time.Sleep(time.Second) {}

    // once it's ready, upload indefinitely
    for {
            if len(rows) > 0 { // if there are uninserted rows, create a batch and insert them
                    uploader := client.Dataset(dataset).Table(table).Uploader()
                    insert := make([]*row, min(500, len(rows))) // create a batch of all the rows on buffer, up to 500
                    for i := range insert {insert[i] = <-rows}
                    go func(insert []*row) { // do the actual insert async
                            if err := uploader.Put(context.Background(), insert); err != nil {/* handle error */}
                    }(insert)
            } else { // if there are no rows waiting to be inserted, wait and check again
                    time.Sleep(time.Second)
            }
    }
}

Note: Since math.Min() does not like ints, I had to include func min(a,b int)int{if a<b{return a};return b}.

Here's my full working example.

Upvotes: 3

Related Questions