shab1r
shab1r

Reputation: 43

How to update data in the BigQuery stream buffer?

My Workflow

I receive data in "Pub/Sub" then a "Cloud functions" is triggered which inserts the data inside "BigQuery"

This is the code:

const { BigQuery } = require('@google-cloud/bigquery');
const bigquery = new BigQuery();


exports.telemetryToBigQuery = (data, context) => {

  if (!data.data) {
    throw new Error('No telemetry data was provided!');
    return;
  }

  //Data comes in as base64
  console.log(`raw data: ${data.data}`);

  //Data gets decoded from base64 to string
  const dataDataDecode = Buffer.from(data.data, 'base64').toString();


var indexesSemicolons = [];

for (var i = 0; i < dataDataDecode.length; i++) {
    if (dataDataDecode[i] === ";") {
        indexesSemicolons.push(i);
    }
}

if (indexesSemicolons.length == 14) {

     const brand = dataDataDecode.slice(0, indexesSemicolons[0]);
     const model = dataDataDecode.slice(indexesSemicolons[0] + 1, indexesSemicolons[1]);
     const result = dataDataDecode.slice(indexesSemicolons[1] + 1, indexesSemicolons[2]);

    async function insertRowsAsStream() {
      // Inserts the JSON objects into my_dataset:my_table.

    
      const datasetId = 'put your dataset here';
      const tableId = 'put table id here';
      const rows = [
        {
          Brand: brand,
          Model: model,
          Result: result

        }
      ];

      // Insert data into a table
      await bigquery
        .dataset(datasetId)
        .table(tableId)
        .insert(rows);
      console.log(`Inserted ${rows.length} rows`);
    }
    insertRowsAsStream();
  } else {
    console.log("Invalid message");
    return;
  }
}

This data stays in the BigQuery stream buffer for about 90 minutes, But I need to need to execute an update query which changes the Result column. This is not allowed and causes an error

ApiError: UPDATE or DELETE statement over table pti-tag-copy.ContainerData2.voorinfo would affect rows in the streaming buffer, which is not supported at new ApiError

I need a way to update the Result before the 90 minute buffer time. Can you guys help me please.

I read the following pages online

Life of a BigQuery stream

I read the answer from the following question I think I understand the idea of what he is talking about but I don't know how to execute it.

If I am correct he is saying to stream my data to a temporary table and from there put it into a permanent table.

Stackoverflow DML Update bigQuery

Upvotes: 0

Views: 3687

Answers (3)

shab1r
shab1r

Reputation: 43

I am now using the BigQuery Create a job methode.

the example found here

I directly puts the data inside the table so I don't have to wait 90 minutes for the streaming buffer.

Upvotes: 0

Jose Gutierrez Paliza
Jose Gutierrez Paliza

Reputation: 1428

To answer your question. Yes, it says that you should stream the data to a temporary table and copy it to another permanent table and in the original table you can enable an expiration time. This means that the table will be deleted after the expiration time has passed.

You can change your filters so they do not include data that could be in the current streaming buffer. If it is the case that you are using a partitioned table when you update your data you can add a WHERE clause where the timestamp has an interval of 40 to 90 minutes like:

WHERE Partitiontime < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 40 MINUTE).

Upvotes: 0

guillaume blaquiere
guillaume blaquiere

Reputation: 75715

Yes, that's correct. When the data are streamed, you can't use DML. The solution is to query the data in the streaming buffer and to transform them in another table. It could be a temporary as you say and sink them in a permanent table.

You can also consider that streamed data from PubSub are raw data, and you want to keep them, and then you need te refine data in another tables. It's also a common data engineering patterns and to have different layer of filter and transformation up to the final and useful data (also named datamarts)

Upvotes: 1

Related Questions