Reputation: 91
Is it possible to run script which proccess user data without running google app engine webservice?
With smaller scripts it works well but when my scripts last about 40minutes I'm getting error: DeadlineExceededError
My temporary fix was to use windows scheduler on windows VM and command line with python script
Edit: code added
jobs = []
jobs_status = []
jobs_error = []
# The project id whose datasets you'd like to list
PROJECT_NUMBER = 'project'
scope = ('https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive',
'https://spreadsheets.google.com/feeds')
credentials = ServiceAccountCredentials.from_json_keyfile_name('client_secrets.json', scope)
# Create the bigquery api client
service = googleapiclient.discovery.build('bigquery', 'v2', credentials=credentials)
def load_logs(source):
body = {"rows": [
{"json": source}
]}
response = service.tabledata().insertAll(
projectId=PROJECT_NUMBER,
datasetId='test',
tableId='test_log',
body=body).execute()
return response
def job_status():
for job in jobs:
_jobId = job['jobReference']['jobId']
status = service.jobs().get(projectId=PROJECT_NUMBER, jobId=_jobId).execute()
jobs_status.append(status['status']['state'])
if 'errors' in status['status'].keys():
query = str(status['configuration']['query']['query'])
message = str(status['status']['errorResult']['message'])
jobs_error.append({"query": query, "message": message})
return jobs_status
def check_statues():
while True:
if all('DONE' in job for job in job_status()):
return
def insert(query, tableid, disposition):
job_body = {
"configuration": {
"query": {
"query": query,
"useLegacySql": True,
"destinationTable": {
"datasetId": "test",
"projectId": "project",
"tableId": tableid
},
"writeDisposition": disposition
}
}
}
r = service.jobs().insert(
projectId=PROJECT_NUMBER,
body=job_body).execute()
jobs.append(r)
return r
class MainPage(webapp2.RequestHandler):
def get(self):
query = "SELECT * FROM [gdocs_users.user_empty]"
insert(query, 'users_data_p1', "WRITE_TRUNCATE")
check_statues()
query = "SELECT * FROM [gdocs_users.user_empty]"
insert(query, 'users_data_p2', "WRITE_TRUNCATE")
query = "SELECT * FROM [gdocs_users.user_%s]"
for i in range(1, 1000):
if i <= 600:
insert(query % str(i).zfill(4), 'users_data_p1', "WRITE_APPEND")
else:
insert(query % str(i).zfill(4), 'user_data_p2', "WRITE_APPEND")
for error in jobs_error:
load_logs(error)
app = webapp2.WSGIApplication([
('/', MainPage),
], debug=True)
Upvotes: 2
Views: 236
Reputation: 6841
By default App Engine services uses automatic scaling, which has a 60 second limit for HTTP requests and a 10 minute limit for task queue requests. If you change your service to use basic or manual scaling, then your task queue request can run for up to 24 hours.
It sounds like you likely only need one instance for this work, so perhaps create a second service in addition to the default service. In a subfolder create a bqservice
folder with the following app.yaml
settings that use basic scaling with a max of one instance:
# bqsservice/app.yaml
# Possibly use a separate service for your BQ code than
# the rest of your app:
service: bqservice
runtime: python27
api_version: 1
# Keep low memory/cost B1 class?
instance_class: B1
# Limit max services to 1 to keep costs down. There is an
# 8 instance hour limit to the free tier. This option still
# scales to 0 when not in use.
basic_scaling:
max_instances: 1
# Handlers:
handlers:
- url: /.*
script: main.app
Then create a cron.yaml
in the same service to schedule your script to run. With my example configuration above you would put your BigQuery logic into a main.py file with a WSGI app defined within it:
# bqservice/main.py
import webapp2
class CronHandler(webapp2.RequestHandler):
def post(self):
# Handle your cron work
# ....
app = webapp2.WSGIApplication([
#('/', MainPage), # If you needed other handlers
('/mycron', CronHandler),
], debug=True)
You could work this all into the default service if you don't plan on using the App Engine app for anything else. If you do this in addition to the default service, you'll need to deploy something to the default service first, even if it is just a simple app.yaml
with static files.
Upvotes: 2
Reputation: 59175
Most BigQuery operations can run asynchronously. Can you show us your code?
For example, from the Python BigQuery docs:
def query(query):
client = bigquery.Client()
query_job = client.run_async_query(str(uuid.uuid4()), query)
query_job.begin()
query_job.result() # Wait for job to complete
That's an asynchronous job, and the code is choosing to wait for the query to complete. Instead of waiting, get the job id after begin()
. You can enqueue a task to run later with the Task Queue, to check for the results of that job.
Upvotes: 0