JJZ
JJZ

Reputation: 395

How to set up job dependencies in google bigquery?

I have a few jobs, say one is loading a text file from a google cloud storage bucket to bigquery table, and another one is a scheduled query to copy data from one table to another table with some transformation, I want the second job to depend on the success of the first one, how do we achieve this in bigquery if it is possible to do so at all?

Many thanks.

Best regards,

Upvotes: 3

Views: 2898

Answers (4)

Alessandro
Alessandro

Reputation: 655

You want to use an orchestration tool, especially if you want to set up this tasks as recurring jobs. We use Google Cloud Composer, which is a managed service based on Airflow, to do workflow orchestration and works great. It comes with automatically retry, monitoring, alerting, and much more.

You might want to give it a try.

Upvotes: 2

Browny Lin
Browny Lin

Reputation: 2507

Basically you can use Cloud Logging to know almost all kinds of operations in GCP.

BigQuery is no exception. When the query job completed, you can find the corresponding log in the log viewer.

The next question is how to anchor the exact query you want, one way to achieve this is to use labeled query (means attach labels to your query) [1].

For example, you can use below bq command to issue query with foo:bar label

bq query \
--nouse_legacy_sql \
--label foo:bar \
'SELECT COUNT(*) FROM `bigquery-public-data`.samples.shakespeare'

Then, when you go to Logs Viewer and issue below log filter, you will find the exactly log generated by above query.

resource.type="bigquery_resource"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.labels.foo="bar"

The next question is how to emit an event based on this log for the next workload. Then, the Cloud Pub/Sub comes into play.

2 ways to publish an event based on log pattern are:

  1. Log Routers: set Pub/Sub topic as the destination [1]
  2. Log-based Metrics: create alert policy whose notification channel is Pub/Sub [2]

So, the next workload can subscribe to the Pub/Sub topic, and be triggered when the previous query has completed.

Hope this helps ~

[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration
[2] https://cloud.google.com/logging/docs/routing/overview
[3] https://cloud.google.com/logging/docs/logs-based-metrics

Upvotes: 1

Adrian
Adrian

Reputation: 2113

It is possible to address your scenario with either cloud functions(CF) or with a scheduler (airflow). The first approach is event-driven getting your data crunch immediately. With the scheduler, expect data availability delay.

As it has been stated once you submit BigQuery job you get back job ID, that needs to be check till it completes. Then based on the status you can handle on success or failure post actions respectively.

If you were to develop CF, note that there are certain limitations like execution time (max 9min), which you would have to address in case BigQuery job takes more than 9 min to complete. Another challenge with CF is idempotency, making sure that if the same datafile event comes more than once, the processing should not result in data duplicates.

Alternatively, you can consider using some event-driven serverless open source projects like BqTail - Google Cloud Storage BigQuery Loader with post-load transformation.

Here is an example of the bqtail rule.

rule.yaml

When:
  Prefix: "/mypath/mysubpath"
  Suffix: ".json"
Async: true
Batch:
  Window:
    DurationInSec: 85
Dest:
  Table: bqtail.transactions
  Transient:
    Dataset: temp
    Alias: t
  Transform:
    charge: (CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)
  SideInputs:
    - Table: bqtail.fees
      Alias: f
      'On': t.fee_id = f.id
OnSuccess:
  - Action: query
    Request:
      SQL: SELECT
        DATE(timestamp) AS date,
        sku_id,
        supply_entity_id,
        MAX($EventID) AS batch_id,
        SUM( payment) payment,
        SUM((CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)) charge,
        SUM(COALESCE(qty, 1.0)) AS qty
        FROM $TempTable t
        LEFT JOIN bqtail.fees f ON f.id = t.fee_id
        GROUP BY 1, 2, 3
      Dest: bqtail.supply_performance
      Append: true
    OnFailure:
      - Action: notify
        Request:
          Channels:
            - "#e2e"
          Title: Failed to aggregate data to supply_performance
          Message: "$Error"
    OnSuccess:
      - Action: query
        Request:
          SQL: SELECT CURRENT_TIMESTAMP() AS timestamp, $EventID AS job_id
          Dest: bqtail.supply_performance_batches
          Append: true
      - Action: delete

Upvotes: 2

Pentium10
Pentium10

Reputation: 207838

Right now a developer needs to put together the chain of operations. It can be done either using Cloud Functions (supports, Node.js, Go, Python) or via Cloud Run container (supports gcloud API, any programming language).

Basically you need to

  1. issue a job
  2. get the job id
  3. poll for the job id
  4. job is finished trigger other steps

If using Cloud Functions

  1. place the file into a dedicated GCS bucket
  2. setup a GCF that monitors that bucket and when a new file is uploaded it will execute a function that imports into GCS - wait until the operations ends
  3. at the end of the GCF you can trigger other functions for next step

another use case with Cloud Functions:

A: a trigger starts the GCF
B: function executes the query (copy data to another table)
C: gets a job id - fires another function with a bit of delay

I: a function gets a jobid
J: polls for job is ready?
K: if not ready, fires himself again with a bit of delay
L: if ready triggers next step - could be a dedicated function or parameterized function

Upvotes: 2

Related Questions