Reputation:
I'm trying to automate JSON data upload to BigQuery using two cloud functions which both deploys successfully and a cloud scheduler which runs successfully. After running the cloud scheduler, data gets uploaded to my cloud storage, but then it doesn't get uploaded to BigQuery.
Below are my code and JSON data:
# function 1 triggered by http
def function(request):
url = "https://api...."
headers = {"Content-Type" : "application/json",
"Authorization" : "..."}
response = requests.get(url, headers=headers)
json_data = response.json()
pretty_json = json.dumps(json_data, indent=4, sort_keys=True)
storage_client = storage.Client()
bucket = storage_client.bucket("bucket_name")
blob = bucket.blob("blob_name")
blob.upload_from_string(pretty_json)
# function 2 triggered by cloud storage -> event type finalize/create
def function_2(data, context):
client = bigquery.Client()
table_id = "booming-post-322920:dataset_name.table_name"
job_config = bigquery.LoadJobConfig()
job_config.schema=[
bigquery.SchemaField("order_items", "INTEGER"),
bigquery.SchemaField("created_at", "TIMESTAMP"),
.....,
bigquery.SchemaField("updated_at", "TIMESTAMP")
]
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
uri = 'gs://bucket_name/blob_name'
load_job = client.load_table_from_uri(
uri,
table_id,
location="US",
job_config=job_config
)
load_job.result()
This is what my JSON data pretty_json
looks like:
{
"records": [
{
"active": null,
"approved": null,
"buyer": [
1
],
"cancel_reason": null,
"cancelled": null,
"chef": [
1
],
"completed": null,
"created_at": "2021-07-15T17:44:31.064Z",
...
Please advise.
Upvotes: 4
Views: 907
Reputation: 53461
I think the main problem is the format of your JSON file: you are specifying newline delimited JSON format (bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
) as required by BigQuery, but your JSON doesn't conform to that format.
Please, consider the following modifications to your first function:
def function(request):
url = "https://api...."
headers = {"Content-Type" : "application/json",
"Authorization" : "..."}
response = requests.get(url, headers=headers)
json_data = response.json()
records = [json.dumps(record) for record in json_data["records"]]
records_data = "\n".join(records)
storage_client = storage.Client()
bucket = storage_client.bucket("bucket_name")
blob = bucket.blob("blob_name")
blob.upload_from_string(records_data)
Your JSON will look like the following now:
{"active": null, "approved": null, "buyer": [1], "cancel_reason": null, "cancelled": null, "chef": [1], "completed": null, "created_at": "2021-07-15T17:44:31.064Z", "delivery": false, "delivery_address": null, "delivery_fee": null, "delivery_instructions": null, "discount": 0, "id": 1, "name": "Oak's Order", "notes": null, "order_delivery_time": null, "order_id": null, "order_ready_time": null, "order_submitted_time": null, "paid": null, "pickup_address": "", "promo_applied": null, "promo_code": null, "rated": null, "ratings": null, "review": null, "seller": [1], "status": "In Process", "tax": null, "tip": 0, "total": null, "type": "Pick Up", "updated_at": "2021-07-15T17:44:31.064Z"}
{"active": null, "approved": null, "buyer": [2], "cancel_reason": null, "cancelled": null, "chef": [1], "completed": null, "created_at": "2021-07-15T17:52:53.729Z", "delivery": false, "delivery_address": null, "delivery_fee": null, "delivery_instructions": null, "discount": 0, "id": 2, "name": "Shuu's Order", "notes": null, "order_delivery_time": null, "order_id": null, "order_ready_time": null, "order_submitted_time": null, "paid": null, "pickup_address": "", "promo_applied": null, "promo_code": null, "rated": null, "ratings": null, "review": null, "seller": [1], "status": "In Process", "tax": null, "tip": 0, "total": null, "type": "Pick Up", "updated_at": "2021-07-15T17:52:53.729Z"}
In addition, in your second function, as also pointed out by @CaioT in his/her comment, you need to change your function signature to accept two arguments, event
and context
, according to the GCS storage trigger event definition.
In addition, please, consider review the definition of the order_items
fields in the BigQuery schema definition, according to your JSON that field not exists.
Pay attention to the limitations imposed by BigQuery when importing JSON data as well, especially when dealing with timestamps.
Finally, be sure your function has the necessary permissions to interact with BigQuery.
By default, at runtime your function will assume your App Engine service account although you can provide a specific service account as well. Be sure that in any case the service account has the necessary permissions over BigQuery and your BigQuery table. Basically your service account must be bigquery.user
and be WRITER
(or equivalently, bigquery.dataEditor
) of your dataset. Please, see the examples provided in the GCP documentation.
Upvotes: 2