Marcin_S
Marcin_S

Reputation: 539

Google BigQuery How to dynamically Add Column to Bigquery Result

I have application which queries our BQ datasets and store result to the BQ tables : My Code :

    BigQuery  bigquery = bigQuery();
    TableId destinationTable = TableId.of(datasetName, TableName);
    
    QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
                    .setDestinationTable(destinationTable).setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
                 .build();
 TableResult results = bigquery.query(queryConfig); 

While writing the result to BQ dataset i want to append a column to every row similar like this :

queryConfig.addNewColumnToEveryRow("ID", "123");

How to do that ?

Upvotes: 1

Views: 883

Answers (2)

Kabilan Mohanraj
Kabilan Mohanraj

Reputation: 1906

The efficient solution is to change the query itself as shown in @Brent's solution. The other solution mentioned by @Mikhail is to post-process the returned result from the query execution. Please refer to the below code snippet for the programmatic way to post-process (add a new column) and load the data into BigQuery.

The flow of the program is as follows

  1. Execute the query and obtain the results.
  2. Iterate over the result and construct a JSON array.
  3. Write the JSON array to a local file in NDJSON format.
  4. Load the local file into a BigQuery table by creating a Batch load job (implemented below). You can also use the streaming API to load the data.
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;


public class AddNewColumn {

    public static void main(String[] args) throws IOException {
        runSimpleQuery();
    }

    public static void runSimpleQuery() throws IOException {

        String query = "SELECT corpus, SUM(word_count) as word_count FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus ORDER BY word_count LIMIT 5;";
        simpleQuery(query);
  }

    public static void simpleQuery(String query) throws IOException {
        try {
            // Initialize client that will be used to send requests. This client only needs to be created
            // once, and can be reused for multiple requests.
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

            // Create the query job.
            QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();

            // Execute the query.
            TableResult result = bigquery.query(queryConfig);
            System.out.println("\nQuery ran successfully");


            // Construct JSON array from the individual rows
            ArrayList<String> columnNames = new ArrayList<String>();
            result.getSchema().getFields().forEach(field ->  columnNames.add(field.getName())); // get column names

            JsonArray jsonArray = new JsonArray();

            result.iterateAll().forEach(rows -> {
            JsonObject jsonObject = new JsonObject();
            jsonObject.addProperty("ID", 123);
            columnNames.forEach(
                column -> {
                    jsonObject.addProperty(column, rows.get(column).getValue().toString());
                }
            );

            jsonArray.add(jsonObject);
            });
        

            // Writing JSON array to a temporary file in NDJSON format
            FileWriter file = new FileWriter("./tempfile.json");
            jsonArray.forEach(jsonElement -> {
                try {
                    file.write(jsonElement.toString());
                    file.write("\n");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            file.close();

            System.out.println("Data written to temporary file.");

            // Create a load job to insert data
            // TODO: Change the destination dataset and table information.
            String datasetName = "MY_DATASET_NAME";
            String tableName = "MY_TABLE_NAME";
            Path jsonPath = FileSystems.getDefault().getPath(".", "tempfile.json");
            insertDataIntoDestinationTable(datasetName, tableName, jsonPath, FormatOptions.json());

        } catch (BigQueryException | InterruptedException e) {
            System.out.println("Query did not run \n" + e.toString());
        }
    }

    private static void insertDataIntoDestinationTable(String datasetName, String tableName, Path jsonPath, FormatOptions formatOptions) throws InterruptedException, IOException {
        try {
            // Initialize client that will be used to send requests. This client only needs to be created
            // once, and can be reused for multiple requests.
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            TableId tableId = TableId.of(datasetName, tableName);

            WriteChannelConfiguration writeChannelConfiguration =
                WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();

            // The location and JobName must be specified; other fields can be auto-detected.
            String jobName = "jobId_" + UUID.randomUUID().toString();
            JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();

            // Imports a local file into a table.
            try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
                OutputStream stream = Channels.newOutputStream(writer)) {
                Files.copy(jsonPath.toFile(), stream);
      }

            // Get the Job created by the TableDataWriteChannel and wait for it to complete.
            Job job = bigquery.getJob(jobId);
            Job completedJob = job.waitFor();
            if (completedJob == null) {
                System.out.println("Job not executed since it no longer exists.");
                return;
            } else if (completedJob.getStatus().getError() != null) {
                System.out.println(
                    "BigQuery was unable to load local file to the table due to an error: \n"
                        + job.getStatus().getError());
                return;
            }
            
        } catch (BigQueryException e) {
            System.out.println("Local file not loaded. \n" + e.toString());
        }


    }
}

Output: The query results have been successfully inserted into the destination table. enter image description here

Upvotes: 0

Brent Worden
Brent Worden

Reputation: 10974

This should be possible adding it to your query string.

String query = "SELECT yourOtherFields, 123 AS ID FROM yourSource";

Upvotes: 1

Related Questions