Reputation: 740
I'm building a python application that uses the Google drive APIs, so fare the development is good but I have a problem to retrieve the entire Google drive file tree, I need that for two purposes:
For now I have a function that fetch the root of Gdrive and I can build the three by recursive calling a function that list me the content of a single folder, but it is extremely slow and can potentially make thousand of request to google and this is unacceptable.
Here the function to get the root:
def drive_get_root():
"""Retrieve a root list of File resources.
Returns:
List of dictionaries.
"""
#build the service, the driveHelper module will take care of authentication and credential storage
drive_service = build('drive', 'v2', driveHelper.buildHttp())
# the result will be a list
result = []
page_token = None
while True:
try:
param = {}
if page_token:
param['pageToken'] = page_token
files = drive_service.files().list(**param).execute()
#add the files in the list
result.extend(files['items'])
page_token = files.get('nextPageToken')
if not page_token:
break
except errors.HttpError, _error:
print 'An error occurred: %s' % _error
break
return result
and here the one to get the file from a folder
def drive_files_in_folder(folder_id):
"""Print files belonging to a folder.
Args:
folder_id: ID of the folder to get files from.
"""
#build the service, the driveHelper module will take care of authentication and credential storage
drive_service = build('drive', 'v2', driveHelper.buildHttp())
# the result will be a list
result = []
#code from google, is working so I didn't touch it
page_token = None
while True:
try:
param = {}
if page_token:
param['pageToken'] = page_token
children = drive_service.children().list(folderId=folder_id, **param).execute()
for child in children.get('items', []):
result.append(drive_get_file(child['id']))
page_token = children.get('nextPageToken')
if not page_token:
break
except errors.HttpError, _error:
print 'An error occurred: %s' % _error
break
return result
and for example now to check if a file exist I'm using this:
def drive_path_exist(file_path, list = False):
"""
This is a recursive function to che check if the given path exist
"""
#if the list param is empty set the list as the root of Gdrive
if list == False:
list = drive_get_root()
#split the string to get the first item and check if is in the root
file_path = string.split(file_path, "/")
#if there is only one element in the filepath we are at the actual filename
#so if is in this folder we can return it
if len(file_path) == 1:
exist = False
for elem in list:
if elem["title"] == file_path[0]:
#set exist = to the elem because the elem is a dictionary with all the file info
exist = elem
return exist
#if we are not at the last element we have to keep searching
else:
exist = False
for elem in list:
#check if the current item is in the folder
if elem["title"] == file_path[0]:
exist = True
folder_id = elem["id"]
#delete the first element and keep searching
file_path.pop(0)
if exist:
#recursive call, we have to rejoin the filpath as string an passing as list the list
#from the drive_file_exist function
return drive_path_exist("/".join(file_path), drive_files_in_folder(folder_id))
any idea how to solve my problem? I saw a few discussion here on overflow and in some answers people wrote that this is possible but of course the didn't said how!
Thanks
Upvotes: 12
Views: 16421
Reputation: 11
I stumbled into this problem recently, because I had to verify the existence of many many files on Google Drive inside of an specified folder and its children.
I created a few classes to handle this:
driveServiceFetcher: Class that handles requests from Google, like getting ALL folders on Drive or fetching all files on specified folders. Nothing very different from what you have here (and based on some solutions I read from this thread)
from fileinput import filename
import os.path
from typing import final
from urllib import response
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pyparsing import opAssoc
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
class driveServiceFetcher:
def __init__(self) -> None:
self._credentials = self.getDriveCredentials()
self.service = build('drive', 'v3', credentials=self._credentials)
def getDriveCredentials(self):
"""Shows basic usage of the Drive v3 API.
Prints the names and ids of the first 10 files the user has access to.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def get_all_folders_in_drive(self):
driveFolders = []
try:
page_token = None
max_allowed_page_size = 1000
foldersQuery = "trashed = false and mimeType = 'application/vnd.google-apps.folder'"
while True:
results = self.service.files().list(
pageSize=max_allowed_page_size,
fields="nextPageToken, files(id, name, parents)",
includeItemsFromAllDrives=False, supportsAllDrives=False,
corpora='user',
##driveId=DRIVE_ID,
pageToken=page_token,
q=foldersQuery).execute()
folders = results.get('files', [])
page_token = results.get('nextPageToken', None)
for folder in folders:
driveFolders.append(folder)
if page_token is None:
break
except:
pass
finally:
return driveFolders
def get_all_files_in_folders(self, parentFoldersIds):
"""
Return a dictionary of file IDs mapped to file names for the specified parent folders.
"""
files = []
page_token = None
max_allowed_page_size = 1000
parentsQuery = buildAllParentsQuery(parentFoldersIds)
filesQuery = f"mimeType != 'application/vnd.google-apps.folder' and trashed = false and ({parentsQuery})"
while True:
results = self.service.files().list(
pageSize=max_allowed_page_size,
fields="nextPageToken, files(id, name, mimeType, parents)",
includeItemsFromAllDrives=False, supportsAllDrives=False,
# corpora='drive',
# driveId=DRIVE_ID,
pageToken=page_token,
q=filesQuery).execute()
fetchedFiles = results.get('files', [])
page_token = results.get('nextPageToken', None)
for fetchedFile in fetchedFiles:
files.append(fetchedFile)
if page_token is None:
break
return files
def buildAllParentsQuery(parentIds):
return ' in parents or '.join('"{0}"'.format(f) for f in parentIds) + ' in parents'
if __name__ == '__main__':
pass
driveExplorer: Constructs the GoogleDrive hierarchy, so we can fetch all the folders once, and then we do not have to be constantly querying. I use this to get all the child nodes recursive for any folder I want, that way I can access their ids and used them in my serviceFetcher.get_all_files_in_folders()
import driveNode
class driveExplorer:
def __init__(self, rootId, folderNodes:list[driveNode.driveNode]):
##ConstructNodes
self.rootId = rootId
self._flatHierarchy = folderNodes
self._createTree()
self._assignRootNode()
if self.rootNode == None:
raise DriveExplorerException
self._optimizeFlattenedHierarchy()
def _createTree(self):
for n in self._flatHierarchy:
parent = self.getNodeWithId(n.parentId)
if parent != None:
parent.children.append(n)
n.parent = parent
def _assignRootNode(self):
self.rootNode = self.getNodeWithId(self.rootId)
def _optimizeFlattenedHierarchy(self):
print(f"Files in flat hierarchy PRE optimization: {len(self._flatHierarchy)}")
rootChildren = []
rootChildren.append(self.rootNode)
rootChildren.extend(self.getAllChildrenNodes(self.rootNode))
self._flatHierarchy = rootChildren
print(f"Files in flat hierarchy POST optimization: {len(self._flatHierarchy)}")
def getAllChildrenNodes(self, node:driveNode.driveNode):
nodes = []
for n in node.children:
nodes.append(n)
nodes.extend(self.getAllChildrenNodes(n))
return nodes
def getNodeWithId(self, nodeId):
for n in self._flatHierarchy:
if n.id == nodeId:
return n
return None
##TODO: Delete? Unused
def getNodesWithParent(self, parentId):
nodes = list[driveNode.driveNode]
for n in self._flatHierarchy:
if n.parentId == parentId:
nodes.append(n)
return nodes
def getNodesWithName(self, nodeName):
nodes = []
for n in self._flatHierarchy:
if n.name == nodeName:
nodes.append(n)
return nodes
def getDirectory(self, node:driveNode.driveNode):
directory = [node]
currentNode = node
while currentNode.id != self.rootId:
if currentNode.parent == None:
break
directory.insert(0, currentNode.parent)
currentNode = currentNode.parent
return directory
class DriveExplorerException(Exception):
pass
def folderListToNodeList(folders):
nodes = []
for folder in folders:
parents = folder.get('parents', None)
if parents is not None:
parent = parents[0]
else:
parent = None
newNode = driveNode.driveNode(folder["name"],
folder["id"],
parent)
nodes.append(newNode)
return nodes
if __name__ == '__main__':
pass
driveNode: A class with the Google Drive file info: Name, Id, parentId, array of Children(nodes) and the parent(node)
class driveNode:
def __init__(self, name, id, parentId) -> None:
self.name = name
self.id = id
self.parentId = parentId
self.children = []
self.parent = None
if __name__ == '__main__':
pass
That way I can look for files (recursive) by doing something like this:
##ROOT_NODE_ID, the drive explorer requires a folder to be the "parent node"
driveFetcher = driveServiceFetcher()
allFolders = driveFetcher.get_all_folders_in_drive()
explorer = driveExplorer(ROOT_NODE_ID, folderListToNodeList(allFolders))
...
nodesToCrawl = explorer.getAllChildrenNodes(folder)
idsToCrawl = []
for n in nodesToCrawl:
idsToCrawl.append(n.id)
files = driveFetcher.get_all_files_in_folders(idsToCrawl)
Upvotes: 1
Reputation: 22286
In order to build a representation of a tree in your app, you need to do this ...
If you simply want to check if file-A exists in folder-B, the approach depends on whether the name "folder-B" is guaranteed to be unique.
If it's unique, just do a FilesList query for title='file-A', then do a Files Get for each of its parents and see if any of them are called 'folder-B'.
You don't say if these files and folders are being created by your app, or by the user with the Google Drive Webapp. If your app is the creator of these files/folders there is a trick you can use to restrict your searches to a single root. Say you have
MyDrive/app_root/folder-C/folder-B/file-A
you can make all of folder-C, folder-B and file-A children of app_root
That way you can constrain all of your queries to include
and 'app_root_id' in parents
NB. A previous version of this answer highlighted that Drive folders were not constrained to an inverted tree hierarchy, because a single folder could have multiple parents. As of 2021, this is no longer true and a Drive File (including Folders, which are simply special files) can only be created with a single parent.
Upvotes: 13
Reputation: 22058
I agree with @pinoyyid - Google drive is not a typical tree structure.
BUT, for printing the folder structure I would still consider using a tree visualization library (for example like treelib).
Below is a full solution for printing your google drive file system recursively.
from treelib import Node, Tree
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
gauth = GoogleAuth()
gauth.LocalWebserverAuth()
drive = GoogleDrive(gauth)
### Helper functions ###
def get_children(root_folder_id):
str = "\'" + root_folder_id + "\'" + " in parents and trashed=false"
file_list = drive.ListFile({'q': str}).GetList()
return file_list
def get_folder_id(root_folder_id, root_folder_title):
file_list = get_children(root_folder_id)
for file in file_list:
if(file['title'] == root_folder_title):
return file['id']
def add_children_to_tree(tree, file_list, parent_id):
for file in file_list:
tree.create_node(file['title'], file['id'], parent=parent_id)
print('parent: %s, title: %s, id: %s' % (parent_id, file['title'], file['id']))
### Recursion over all children ###
def populate_tree_recursively(tree,parent_id):
children = get_children(parent_id)
add_children_to_tree(tree, children, parent_id)
if(len(children) > 0):
for child in children:
populate_tree_recursively(tree, child['id'])
### Create tree and start populating from root ###
def main():
root_folder_title = "your-root-folder"
root_folder_id = get_folder_id("root", root_folder_title)
tree = Tree()
tree.create_node(root_folder_title, root_folder_id)
populate_tree_recursively(tree, root_folder_id)
tree.show()
if __name__ == "__main__":
main()
Upvotes: 1
Reputation: 19835
Will never work like that except for very small trees. You have to rethink your entire algorithm for a cloud app (you have written it like a desktop app where you own the machine) since it will timeout easily. You need to mirror the tree beforehand (taskqueues and datastore) not just to avoid timeouts but also to avoid drive rate limits, and keep it in sync somehow (register for push etc). Not easy at all. Ive done a drive tree viewer before.
Upvotes: 2
Reputation: 99
An easy way to check if a file exist in a specific path is: drive_service.files().list(q="'THE_ID_OF_SPECIFIC_PATH' in parents and title='a file'").execute()
To walk all folders and files:
import sys, os
import socket
import googleDriveAccess
import logging
logging.basicConfig()
FOLDER_TYPE = 'application/vnd.google-apps.folder'
def getlist(ds, q, **kwargs):
result = None
npt = ''
while not npt is None:
if npt != '': kwargs['pageToken'] = npt
entries = ds.files().list(q=q, **kwargs).execute()
if result is None: result = entries
else: result['items'] += entries['items']
npt = entries.get('nextPageToken')
return result
def uenc(u):
if isinstance(u, unicode): return u.encode('utf-8')
else: return u
def walk(ds, folderId, folderName, outf, depth):
spc = ' ' * depth
outf.write('%s+%s\n%s %s\n' % (spc, uenc(folderId), spc, uenc(folderName)))
q = "'%s' in parents and mimeType='%s'" % (folderId, FOLDER_TYPE)
entries = getlist(ds, q, **{'maxResults': 200})
for folder in entries['items']:
walk(ds, folder['id'], folder['title'], outf, depth + 1)
q = "'%s' in parents and mimeType!='%s'" % (folderId, FOLDER_TYPE)
entries = getlist(ds, q, **{'maxResults': 200})
for f in entries['items']:
outf.write('%s -%s\n%s %s\n' % (spc, uenc(f['id']), spc, uenc(f['title'])))
def main(basedir):
da = googleDriveAccess.DAClient(basedir) # clientId=None, script=False
f = open(os.path.join(basedir, 'hierarchy.txt'), 'wb')
walk(da.drive_service, 'root', u'root', f, 0)
f.close()
if __name__ == '__main__':
logging.getLogger().setLevel(getattr(logging, 'INFO'))
try:
main(os.path.dirname(__file__))
except (socket.gaierror, ), e:
sys.stderr.write('socket.gaierror')
using googleDriveAccess github.com/HatsuneMiku/googleDriveAccess
Upvotes: 1