Reputation: 93
from google.cloud import bigquery
query = """ select * from emp where emp_name=@emp_name"""
query_params = [bigquery.ScalarQueryParameter('emp_name', 'STRING', 'name')]
job_config = bigquery.QueryJobConfig()
job_config.query_parameters = query_params
client = bigquery.Client()
query_job = client.query(query, job_config=job_config)
result = query_job.result()
How can I write the result to Google Cloud Storage instead of writing it to the CSV and uploading it to cloud storage bucket?
Upvotes: 9
Views: 20357
Reputation: 177
This is probably what you're looking for: https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#export_data_statement
The EXPORT DATA statement exports the results of a query to an external storage location. The storage location must be Cloud Storage.
Upvotes: 0
Reputation: 994
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("dev-key.json", scopes=["https://www.googleapis.com/auth/cloud-platform"],)
client = bigquery.Client(credentials=credentials, project=credentials.project_id,)
bq_export_to_gs = """
EXPORT DATA OPTIONS(
uri='gs://my-bucket/logs/edo/dengg_audit/bq-demo/temp4/*',
format='CSV',
overwrite=true,
header=false,
field_delimiter='^') AS
select col1 , col2 from `project.schema.table` where clientguid = '1234' limit 10
"""
query_job= client.query(bq_export_to_gs)
results = query_job.result()
for row in results:
print(row)
Upvotes: 4
Reputation: 258
Solution: BigQuery result to Google Cloud Storage bucket directly
from google.cloud import bigquery
from google.cloud import storage
def export_to_gcs():
QUERY = "SELECT * FROM TABLE where CONDITION" # change the table and where condition
bq_client = bigquery.Client()
query_job = bq_client.query(QUERY) # BigQuery API request
rows_df = query_job.result().to_dataframe()
storage_client = storage.Client() # Storage API request
bucket = storage_client.get_bucket(BUCKETNAME) # change the bucket name
blob = bucket.blob('temp/Add_to_Cart.csv')
blob.upload_from_string(rows_df.to_csv(sep=';',index=False,encoding='utf-8'),content_type='application/octet-stream')
return "success"
Upvotes: 2
Reputation: 3663
You can try this option:
from google.cloud import bigquery
bigqueryClient = bigquery.Client()
uri = "gs://my-bucket/file.csv"
tableRref = bigqueryClient.dataset("my-dataset").table("my-table")
bqJob = bigqueryClient.extract_table(tableRref, uri)
bqJob.result()
Upvotes: 0
Reputation: 1
#THIS IS THE CODE I AM RUNNING
# Set the destination table
for i in range(1,13):
table_ref = client.dataset("newdataset").table("chicago_months_increment")
job_config.destination = table_ref
job_config.allow_large_results = True
query_job = client.query('SELECT * FROM `bigquery-public-
data.chicago_taxi_trips.taxi_trips` WHERE (Select EXTRACT(MONTH from
trip_start_timestamp) )=i;',
location='US', # Location must match dataset
job_config=job_config)
rows = list(query_job) # Waits for the query to finish
query_job.result()
# Export table to GCS
destination_uri = "gs://monthly-data/month-"+i+"-*.csv"
dataset_ref = client.dataset("newdataset", project="chicago-project-247714")
table_ref = dataset_ref.table("chicago_months_increment")
extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result() # Waits for job to complete
client.delete_table(table_ref) #Deletes table in BQ
#ERROR I AM GETTING
---------------------------------------------------------------------------
BadRequest Traceback (most recent call last)
<ipython-input-5-e176648eba58> in <module>()
9 location='US', # Location must match dataset
10 job_config=job_config)
---> 11 rows = list(query_job) # Waits for the query to finish
12
13
/home/amiteshwar/.local/lib/python2.7/site-
packages/google/cloud/bigquery/job.pyc in iter(self) 2988 2989 def iter(self): -> 2990 return iter(self.result()) 2991 2992
/home/amiteshwar/.local/lib/python2.7/site-
packages/google/cloud/bigquery/job.pyc in result(self, timeout, page_size, retry) 2875 If the job did not complete in the given timeout. 2876 """ -> 2877 super(QueryJob, self).result(timeout=timeout) 2878 # Return an iterator instead of returning the job. 2879 if not self._query_results:
/home/amiteshwar/.local/lib/python2.7/site-
packages/google/cloud/bigquery/job.pyc in result(self, timeout, retry) 731 self._begin(retry=retry) 732 # TODO: modify PollingFuture so it can pass a retry argument to done(). --> 733 return super(_AsyncJob, self).result(timeout=timeout) 734 735 def cancelled(self):
/home/amiteshwar/.local/lib/python2.7/site-
packages/google/api_core/future/polling.pyc in result(self, timeout)
125 # pylint: disable=raising-bad-type
126 # Pylint doesn't recognize that this is valid in this case.
--> 127 raise self._exception
128
129 return self._result
BadRequest: 400 Unrecognized name: i at [1:125]
Upvotes: 0
Reputation: 513
@dsesto 's answer was quite useful for me. I used his code and added some additional lines to query BigQuery, write result to the table, then export to GCS and import the result to Dask DataFrame. The code is wrapped into a function.
def df_from_bq(query:str,table=None,compute=False):
from time import gmtime, strftime
from google.cloud import bigquery#y, storage
import dask.dataframe as dd
import gcsfs
client = bigquery.Client.from_service_account_json('YOUR_PATH') #Authentication if BQ using ServiceKey
project = 'YOUR_PROJECT'
table_name = 'result_'+str(strftime("%Y%m%d_%H%M%S", gmtime())) if table==None else table #Creates custome table name if no name is defined
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("YOUR_DATASET").table(table_name)
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE #Creates the table with query result. Overwrites it if the table exists
query_job = client.query(
query,
location='US',
job_config=job_config)
query_job.result()
print('Query results loaded to table {}'.format(table_ref.path))
destination_uri = "gs://YOUR_BUCKET/{}".format(table_name+'_*'+'.csv')
dataset_ref = client.dataset("YOUR_DATASET", project=project)
table_ref = dataset_ref.table(table_name)
extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result() #Extracts results to the GCS
print('Query results extracted to GCS: {}'.format(destination_uri))
client.delete_table(table_ref) #Deletes table in BQ
print('Table {} deleted'.format(table_name))
gcs = gcsfs.GCSFileSystem(project=project, token='cache')
df = dd.read_csv('gcs://YOUR_BUCKET/{}'.format(table_name+'_*'+'.csv'), storage_options={'token': gcs.session.credentials})
#storage_client = storage.Client.from_service_account_json('C:\\Users\o.korshun\Documents\o.korshun.json')
#bucket = storage_client.get_bucket('plarium-analytics')
#blob = bucket.blob(table_name+'.csv')
#blob.delete() #Uncomment if you need to delete Blob after the DataFrame is created
#print('Blob {} deleted'.format(table_name+'.csv'))
print('Results imported to DD!')
return df if compute == False else df.compute().reset_index(in_place=True)
Note that Table in BQ is deleted after the result is imported to the Cloud Storage.
Upvotes: 5
Reputation: 8178
Depending on your specific use case (frequency of the export, size of the exports, etc.), the solutions proposed in the answer by @GrahamPolley may work for you, although they would take more development and attention.
The current possibility for writing query results is either to write the results to a table or to download it locally, and even downloading directly to CSV has some limitations. Therefore, there is not the possibility to write query results to GCS in CSV format directly. However, there is a 2-steps solutions consisting in:
The following Python code can give you an idea of how to perform that task:
from google.cloud import bigquery
client = bigquery.Client()
# Write query results to a new table
job_config = bigquery.QueryJobConfig()
table_ref = client.dataset("DATASET").table("TABLE")
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
query_job = client.query(
'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` LIMIT 10',
location='US', # Location must match dataset
job_config=job_config)
rows = list(query_job) # Waits for the query to finish
# Export table to GCS
destination_uri = "gs://BUCKET/FILE.CSV"
dataset_ref = client.dataset("DATASET", project="PROJECT_ID")
table_ref = dataset_ref.table("TABLE")
extract_job = client.extract_table(
table_ref,
destination_uri,
location='US')
extract_job.result() # Waits for job to complete
Note that, after that, you would have to delete the table (you can also do that programatically). This may not be the best solution if you have to automatize the process (if that is your use case, maybe you should better explore @Graham's solutions), but it will do the trick for a simple scenario.
Upvotes: 15
Reputation: 14791
BigQuery does not support writing its query results directly to GCS. You will have to write the results to a table, and then export the table to GCS after it's been materialised. You could possibly use Cloud Composer to orchestrate this for you.
Or, you could use a Dataflow pipeline to achieve your desired result in one go. But this is a bit more work and will cost more money. The idea would be write a pipeline to read from BigQuery using your SQL query, and then write the results to GCS. It will also be slower though.
Upvotes: 8