Mat
Mat

Reputation: 91

Big query cron job without webservice

Is it possible to run script which proccess user data without running google app engine webservice?

With smaller scripts it works well but when my scripts last about 40minutes I'm getting error: DeadlineExceededError

My temporary fix was to use windows scheduler on windows VM and command line with python script

Edit: code added

jobs = []
jobs_status = []
jobs_error = []
# The project id whose datasets you'd like to list
PROJECT_NUMBER = 'project'
scope = ('https://www.googleapis.com/auth/bigquery',
         'https://www.googleapis.com/auth/cloud-platform',
         'https://www.googleapis.com/auth/drive',
         'https://spreadsheets.google.com/feeds')

credentials = ServiceAccountCredentials.from_json_keyfile_name('client_secrets.json', scope)

# Create the bigquery api client
service = googleapiclient.discovery.build('bigquery', 'v2', credentials=credentials)

def load_logs(source):
    body = {"rows": [
        {"json": source}
    ]}

    response = service.tabledata().insertAll(
        projectId=PROJECT_NUMBER,
        datasetId='test',
        tableId='test_log',
        body=body).execute()
    return response

def job_status():
    for job in jobs:
        _jobId = job['jobReference']['jobId']
        status = service.jobs().get(projectId=PROJECT_NUMBER, jobId=_jobId).execute()
        jobs_status.append(status['status']['state'])
        if 'errors' in status['status'].keys():
            query = str(status['configuration']['query']['query'])
            message = str(status['status']['errorResult']['message'])
            jobs_error.append({"query": query, "message": message})
    return jobs_status


def check_statues():
    while True:
        if all('DONE' in job for job in job_status()):
            return


def insert(query, tableid, disposition):
    job_body = {
     "configuration": {
      "query": {
       "query": query,
       "useLegacySql": True,
       "destinationTable": {
        "datasetId": "test",
        "projectId": "project",
        "tableId": tableid
       },
       "writeDisposition": disposition
      }
     }
    }

    r = service.jobs().insert(
        projectId=PROJECT_NUMBER,
        body=job_body).execute()
    jobs.append(r)
    return r



class MainPage(webapp2.RequestHandler):
    def get(self):
        query = "SELECT * FROM [gdocs_users.user_empty]"
        insert(query, 'users_data_p1', "WRITE_TRUNCATE")
        check_statues()
        query = "SELECT * FROM [gdocs_users.user_empty]"
        insert(query, 'users_data_p2', "WRITE_TRUNCATE")
        query = "SELECT * FROM [gdocs_users.user_%s]"
        for i in range(1, 1000):
            if i <= 600:
                insert(query % str(i).zfill(4), 'users_data_p1', "WRITE_APPEND")
            else:
                insert(query % str(i).zfill(4), 'user_data_p2', "WRITE_APPEND")
        for error in jobs_error:
            load_logs(error)


app = webapp2.WSGIApplication([
    ('/', MainPage),
], debug=True)

Upvotes: 2

Views: 236

Answers (2)

BrettJ
BrettJ

Reputation: 6841

By default App Engine services uses automatic scaling, which has a 60 second limit for HTTP requests and a 10 minute limit for task queue requests. If you change your service to use basic or manual scaling, then your task queue request can run for up to 24 hours.

It sounds like you likely only need one instance for this work, so perhaps create a second service in addition to the default service. In a subfolder create a bqservice folder with the following app.yaml settings that use basic scaling with a max of one instance:

# bqsservice/app.yaml
# Possibly use a separate service for your BQ code than
# the rest of your app:
service: bqservice
runtime: python27
api_version: 1
# Keep low memory/cost B1 class?
instance_class: B1
# Limit max services to 1 to keep costs down. There is an
# 8 instance hour limit to the free tier. This option still
# scales to 0 when not in use.
basic_scaling:
  max_instances: 1

# Handlers:
handlers:
- url: /.*
  script: main.app

Then create a cron.yaml in the same service to schedule your script to run. With my example configuration above you would put your BigQuery logic into a main.py file with a WSGI app defined within it:

# bqservice/main.py
import webapp2

class CronHandler(webapp2.RequestHandler):

    def post(self):
       # Handle your cron work
       # ....

app = webapp2.WSGIApplication([
    #('/', MainPage),  # If you needed other handlers
    ('/mycron', CronHandler),
], debug=True)

You could work this all into the default service if you don't plan on using the App Engine app for anything else. If you do this in addition to the default service, you'll need to deploy something to the default service first, even if it is just a simple app.yaml with static files.

Upvotes: 2

Felipe Hoffa
Felipe Hoffa

Reputation: 59175

Most BigQuery operations can run asynchronously. Can you show us your code?

For example, from the Python BigQuery docs:

def query(query):
    client = bigquery.Client()
    query_job = client.run_async_query(str(uuid.uuid4()), query)

    query_job.begin()
    query_job.result()  # Wait for job to complete

That's an asynchronous job, and the code is choosing to wait for the query to complete. Instead of waiting, get the job id after begin(). You can enqueue a task to run later with the Task Queue, to check for the results of that job.

Upvotes: 0

Related Questions