Reputation: 49
The below script I am using to extract data from Google Analytics. Here I am extracting data for last one week. I want to automate the date range so that i don't have to change date_range every week. I also want to avoid sampling of data by GA. Please guide my the correct way to automate in details.
author = '[email protected] (test)'
import argparse
import sys
import csv
import string
import datetime
import json
import time
from apiclient.errors import HttpError
from apiclient import sample_tools
from oauth2client.client import AccessTokenRefreshError
cam_name = sys.argv[1:]
class SampledDataError(Exception): pass
def main(argv):
# Authenticate and construct service.
service, flags = sample_tools.init(
argv[0], 'analytics', 'v3', __doc__, __file__,
scope='https://www.googleapis.com/analytics.readonly')
# Try to make a request to the API. Print the results or handle errors.
try:
profile_id = profile_ids[profile]
if not profile_id:
print ('Could not find a valid profile for this user.')
else:
metrics = argv[1]
dimensions = argv[2]
reportName = argv[3]
sort = argv[4]
filters = argv[5]
for start_date, end_date in date_ranges:
limit = ga_query(service, profile_id, 0,
start_date, end_date, metrics, dimensions, sort, filters).get('totalResults')
for pag_index in range(0, limit, 10000):
results = ga_query(service, profile_id, pag_index,
start_date, end_date, metrics, dimensions, sort, filters)
# if results.get('containsSampledData'):
# raise SampledDataError
print_results(results, pag_index, start_date, end_date, reportName)
except TypeError as error:
# Handle errors in constructing a query.
print ('There was an error in constructing your query : %s' % error)
except HttpError as error:
# Handle API errors.
print ('Arg, there was an API error : %s : %s' %
(error.resp.status, error._get_reason()))
except AccessTokenRefreshError:
# Handle Auth errors.
print ('The credentials have been revoked or expired, please re-run '
'the application to re-authorize')
except SampledDataError:
# force an error if ever a query returns data that is sampled!
print ('Error: Query contains sampled data!')
def ga_query(service, profile_id, pag_index, start_date, end_date, metrics, dimensions, sort, filters):
return service.data().ga().get(
ids='ga:' + profile_id,
start_date=start_date,
end_date=end_date,
metrics=metrics,
dimensions=dimensions,
sort=sort,
filters=filters,
samplingLevel='HIGHER_PRECISION',
start_index=str(pag_index+1),
max_results=str(pag_index+10000)).execute()
def print_results(results, pag_index, start_date, end_date, reportName):
"""Prints out the results.
This prints out the profile name, the column headers, and all the rows of
data.
Args:
results: The response returned from the Core Reporting API.
"""
# New write header
if pag_index == 0:
if (start_date, end_date) == date_ranges[0]:
print ('Profile Name: %s' % results.get('profileInfo').get('profileName'))
columnHeaders = results.get('columnHeaders')
cleanHeaders = [str(h['name']) for h in columnHeaders]
writer.writerow(cleanHeaders)
print (reportName,'Now pulling data from %s to %s.' %(start_date, end_date))
# Print data table.
if results.get('rows', []):
for row in results.get('rows'):
for i in range(len(row)):
old, new = row[i], str()
for s in old:
new += s if s in string.printable else ''
row[i] = new
writer.writerow(row)
else:
print ('No Rows Found')
limit = results.get('totalResults')
print (pag_index, 'of about', int(round(limit, -4)), 'rows.')
return None
# Uncomment this line & replace with 'profile name': 'id' to query a single profile
# Delete or comment out this line to loop over multiple profiles.
#Brands
profile_ids = {'abc-Mobile': '12345',
'abc-Desktop': '23456',
'pqr-Mobile': '34567',
'pqr-Desktop': '45678',
'xyz-Mobile': '56789',
'xyz-Desktop': '67890'}
date_ranges = [
('2017-01-24','2017-01-24'),
('2017-01-25','2017-01-25'),
('2017-01-26','2017-01-26'),
('2017-01-27','2017-01-27'),
('2017-01-28','2017-01-28'),
('2017-01-29','2017-01-29'),
('2017-01-30','2017-01-30')
]
for profile in sorted(profile_ids):
print("Sequence 1",profile)
with open('qwerty.json') as json_data:
d = json.load(json_data)
for getThisReport in d["Reports"]:
print("Sequence 2",getThisReport["ReportName"])
reportName = getThisReport["ReportName"]
metrics = getThisReport["Metrics"]
dimensions = getThisReport["Dimensions"]
sort = getThisReport["sort"]
filters = getThisReport["filter"]
path = 'C:\\Projects\\DataExport\\test\\' #replace with path to your folder where csv file with data will be written
today = time.strftime('%Y%m%d')
filename = profile+'_'+reportName+'_'+today+'.csv' #replace with your filename. Note %s is a placeholder variable and the profile name you specified on row 162 will be written here
with open(path + filename, 'wt') as f:
writer = csv.writer(f,delimiter = '|', lineterminator='\n', quoting=csv.QUOTE_MINIMAL)
args = [sys.argv,metrics,dimensions,reportName,sort,filters]
if __name__ == '__main__': main(args)
print ( "Profile done. Next profile...")
print ("All profiles done.")
Upvotes: 1
Views: 492
Reputation: 116908
The Core Reporting API supports some interesting things as far as dates goes.
All Analytics data requests must specify a date range. If you do not include start-date and end-date parameters in the request, the server returns an error. Date values can be for a specific date by using the pattern YYYY-MM-DD or relative by using today, yesterday, or the NdaysAgo pattern. Values must match [0-9]{4}-[0-9]{2}-[0-9]{2}|today|yesterday|[0-9]+(daysAgo).
so doing something like
start_date = '7daysAgo'
end_date = 'today'
Just remember that data hasn't completed processing for 24 - 48 hours so your data for today, yesterday and the day before that may not be 100% accurate.
Upvotes: 1