Reputation: 539
I have application which queries our BQ datasets and store result to the BQ tables : My Code :
BigQuery bigquery = bigQuery();
TableId destinationTable = TableId.of(datasetName, TableName);
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
.setDestinationTable(destinationTable).setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
.build();
TableResult results = bigquery.query(queryConfig);
While writing the result to BQ dataset i want to append a column to every row similar like this :
queryConfig.addNewColumnToEveryRow("ID", "123");
How to do that ?
Upvotes: 1
Views: 883
Reputation: 1906
The efficient solution is to change the query itself as shown in @Brent's solution. The other solution mentioned by @Mikhail is to post-process the returned result from the query execution. Please refer to the below code snippet for the programmatic way to post-process (add a new column) and load the data into BigQuery.
The flow of the program is as follows
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
public class AddNewColumn {
public static void main(String[] args) throws IOException {
runSimpleQuery();
}
public static void runSimpleQuery() throws IOException {
String query = "SELECT corpus, SUM(word_count) as word_count FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus ORDER BY word_count LIMIT 5;";
simpleQuery(query);
}
public static void simpleQuery(String query) throws IOException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// Create the query job.
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query).build();
// Execute the query.
TableResult result = bigquery.query(queryConfig);
System.out.println("\nQuery ran successfully");
// Construct JSON array from the individual rows
ArrayList<String> columnNames = new ArrayList<String>();
result.getSchema().getFields().forEach(field -> columnNames.add(field.getName())); // get column names
JsonArray jsonArray = new JsonArray();
result.iterateAll().forEach(rows -> {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("ID", 123);
columnNames.forEach(
column -> {
jsonObject.addProperty(column, rows.get(column).getValue().toString());
}
);
jsonArray.add(jsonObject);
});
// Writing JSON array to a temporary file in NDJSON format
FileWriter file = new FileWriter("./tempfile.json");
jsonArray.forEach(jsonElement -> {
try {
file.write(jsonElement.toString());
file.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
});
file.close();
System.out.println("Data written to temporary file.");
// Create a load job to insert data
// TODO: Change the destination dataset and table information.
String datasetName = "MY_DATASET_NAME";
String tableName = "MY_TABLE_NAME";
Path jsonPath = FileSystems.getDefault().getPath(".", "tempfile.json");
insertDataIntoDestinationTable(datasetName, tableName, jsonPath, FormatOptions.json());
} catch (BigQueryException | InterruptedException e) {
System.out.println("Query did not run \n" + e.toString());
}
}
private static void insertDataIntoDestinationTable(String datasetName, String tableName, Path jsonPath, FormatOptions formatOptions) throws InterruptedException, IOException {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(datasetName, tableName);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();
// The location and JobName must be specified; other fields can be auto-detected.
String jobName = "jobId_" + UUID.randomUUID().toString();
JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();
// Imports a local file into a table.
try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
OutputStream stream = Channels.newOutputStream(writer)) {
Files.copy(jsonPath.toFile(), stream);
}
// Get the Job created by the TableDataWriteChannel and wait for it to complete.
Job job = bigquery.getJob(jobId);
Job completedJob = job.waitFor();
if (completedJob == null) {
System.out.println("Job not executed since it no longer exists.");
return;
} else if (completedJob.getStatus().getError() != null) {
System.out.println(
"BigQuery was unable to load local file to the table due to an error: \n"
+ job.getStatus().getError());
return;
}
} catch (BigQueryException e) {
System.out.println("Local file not loaded. \n" + e.toString());
}
}
}
Output: The query results have been successfully inserted into the destination table.
Upvotes: 0
Reputation: 10974
This should be possible adding it to your query
string.
String query = "SELECT yourOtherFields, 123 AS ID FROM yourSource";
Upvotes: 1