bobby j
bobby j

Reputation: 93

How to write query result to Google Cloud Storage bucket directly?

from google.cloud import bigquery  
query = """ select * from emp where emp_name=@emp_name""" 
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')] 
job_config = bigquery.QueryJobConfig() 
job_config.query_parameters = query_params  
client = bigquery.Client() 
query_job = client.query(query, job_config=job_config) 
result = query_job.result()

How can I write the result to Google Cloud Storage instead of writing it to the CSV and uploading it to cloud storage bucket?

Upvotes: 9

Views: 20357

Answers (8)

Jon
Jon

Reputation: 177

This is probably what you're looking for: https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#export_data_statement

The EXPORT DATA statement exports the results of a query to an external storage location. The storage location must be Cloud Storage.

Upvotes: 0

M80
M80

Reputation: 994

from google.cloud import bigquery
from google.oauth2 import service_account


credentials = service_account.Credentials.from_service_account_file("dev-key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],)
client = bigquery.Client(credentials=credentials, project=credentials.project_id,)


bq_export_to_gs = """
EXPORT DATA OPTIONS(
  uri='gs://my-bucket/logs/edo/dengg_audit/bq-demo/temp4/*',
  format='CSV',
  overwrite=true,
  header=false,
  field_delimiter='^') AS
select col1 , col2 from  `project.schema.table` where clientguid = '1234' limit 10

"""

query_job= client.query(bq_export_to_gs)
results = query_job.result()
for row in results:
    print(row)

Upvotes: 4

muntasir kabir
muntasir kabir

Reputation: 258

Solution: BigQuery result to Google Cloud Storage bucket directly

from google.cloud import bigquery
from google.cloud import storage

def export_to_gcs():
    QUERY = "SELECT * FROM TABLE where CONDITION" # change the table and where condition
    bq_client = bigquery.Client()
    query_job = bq_client.query(QUERY) # BigQuery API request
    rows_df = query_job.result().to_dataframe()
    
    storage_client = storage.Client() # Storage API request
    bucket = storage_client.get_bucket(BUCKETNAME) # change the bucket name
    blob = bucket.blob('temp/Add_to_Cart.csv')
    blob.upload_from_string(rows_df.to_csv(sep=';',index=False,encoding='utf-8'),content_type='application/octet-stream')
    return "success"

Upvotes: 2

Soumendra Mishra
Soumendra Mishra

Reputation: 3663

You can try this option:

from google.cloud import bigquery
bigqueryClient = bigquery.Client()
uri = "gs://my-bucket/file.csv"
tableRref = bigqueryClient.dataset("my-dataset").table("my-table")
bqJob = bigqueryClient.extract_table(tableRref, uri)
bqJob.result()

Upvotes: 0

Amiteshwar Singh
Amiteshwar Singh

Reputation: 1

#THIS IS THE CODE I AM RUNNING

# Set the destination table
for i in range(1,13):
table_ref = client.dataset("newdataset").table("chicago_months_increment")
job_config.destination = table_ref
job_config.allow_large_results = True


query_job = client.query('SELECT * FROM `bigquery-public- 
data.chicago_taxi_trips.taxi_trips` WHERE (Select EXTRACT(MONTH from 
trip_start_timestamp) )=i;',
location='US', # Location must match dataset
job_config=job_config)
rows = list(query_job) # Waits for the query to finish


query_job.result() 

# Export table to GCS
destination_uri = "gs://monthly-data/month-"+i+"-*.csv"
dataset_ref = client.dataset("newdataset", project="chicago-project-247714")
table_ref = dataset_ref.table("chicago_months_increment")



extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result()  # Waits for job to complete
client.delete_table(table_ref) #Deletes table in BQ

#ERROR I AM GETTING
---------------------------------------------------------------------------
BadRequest                                Traceback (most recent call last)
<ipython-input-5-e176648eba58> in <module>()
  9     location='US', # Location must match dataset
 10     job_config=job_config)
---> 11     rows = list(query_job) # Waits for the query to finish
 12 
 13 

 /home/amiteshwar/.local/lib/python2.7/site- 

packages/google/cloud/bigquery/job.pyc in iter(self) 2988 2989 def iter(self): -> 2990 return iter(self.result()) 2991 2992

/home/amiteshwar/.local/lib/python2.7/site- 

packages/google/cloud/bigquery/job.pyc in result(self, timeout, page_size, retry) 2875 If the job did not complete in the given timeout. 2876 """ -> 2877 super(QueryJob, self).result(timeout=timeout) 2878 # Return an iterator instead of returning the job. 2879 if not self._query_results:

/home/amiteshwar/.local/lib/python2.7/site- 

packages/google/cloud/bigquery/job.pyc in result(self, timeout, retry) 731 self._begin(retry=retry) 732 # TODO: modify PollingFuture so it can pass a retry argument to done(). --> 733 return super(_AsyncJob, self).result(timeout=timeout) 734 735 def cancelled(self):

/home/amiteshwar/.local/lib/python2.7/site- 
packages/google/api_core/future/polling.pyc in result(self, timeout)
125             # pylint: disable=raising-bad-type
126             # Pylint doesn't recognize that this is valid in this case.
--> 127             raise self._exception
128 
129         return self._result

BadRequest: 400 Unrecognized name: i at [1:125]

Upvotes: 0

Porada Kev
Porada Kev

Reputation: 513

@dsesto 's answer was quite useful for me. I used his code and added some additional lines to query BigQuery, write result to the table, then export to GCS and import the result to Dask DataFrame. The code is wrapped into a function.

def df_from_bq(query:str,table=None,compute=False):

from time import gmtime, strftime
from google.cloud import bigquery#y, storage 
import dask.dataframe as dd
import gcsfs

client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'

table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined

job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists

query_job = client.query(
    query,
    location='US', 
    job_config=job_config)
query_job.result() 
print('Query results loaded to table {}'.format(table_ref.path))

destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv') 
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US') 
extract_job.result() #Extracts results to the GCS

print('Query results extracted to GCS: {}'.format(destination_uri))

client.delete_table(table_ref) #Deletes table in BQ

print('Table {} deleted'.format(table_name))

gcs = gcsfs.GCSFileSystem(project=project, token='cache') 
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'),  storage_options={'token': gcs.session.credentials})

#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created

#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')

return df if compute == False else df.compute().reset_index(in_place=True)

Note that Table in BQ is deleted after the result is imported to the Cloud Storage.

Upvotes: 5

dsesto
dsesto

Reputation: 8178

Depending on your specific use case (frequency of the export, size of the exports, etc.), the solutions proposed in the answer by @GrahamPolley may work for you, although they would take more development and attention.

The current possibility for writing query results is either to write the results to a table or to download it locally, and even downloading directly to CSV has some limitations. Therefore, there is not the possibility to write query results to GCS in CSV format directly. However, there is a 2-steps solutions consisting in:

  1. Write query results to a BQ table
  2. Export data from a BQ table to a CSV file in GCS. Note that this feature has some limitations too, but they are not as narrow.

The following Python code can give you an idea of how to perform that task:

from google.cloud import bigquery
client = bigquery.Client()

# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = client.query(
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
    location='US', # Location must match dataset
    job_config=job_config)
rows = list(query_job)  # Waits for the query to finish


# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location='US')
extract_job.result()  # Waits for job to complete

Note that, after that, you would have to delete the table (you can also do that programatically). This may not be the best solution if you have to automatize the process (if that is your use case, maybe you should better explore @Graham's solutions), but it will do the trick for a simple scenario.

Upvotes: 15

Graham Polley
Graham Polley

Reputation: 14791

BigQuery does not support writing its query results directly to GCS. You will have to write the results to a table, and then export the table to GCS after it's been materialised. You could possibly use Cloud Composer to orchestrate this for you.

Or, you could use a Dataflow pipeline to achieve your desired result in one go. But this is a bit more work and will cost more money. The idea would be write a pipeline to read from BigQuery using your SQL query, and then write the results to GCS. It will also be slower though.

Upvotes: 8

Related Questions