Matteo Hertel
Matteo Hertel

Reputation: 740

Python Google Drive API - list the entire drive file tree

I'm building a python application that uses the Google drive APIs, so fare the development is good but I have a problem to retrieve the entire Google drive file tree, I need that for two purposes:

  1. Check if a path exist, so if i want upload test.txt under root/folder1/folder2 I want to check if the file already exist and in the case update it
  2. Build a visual file explorer, now I know that google provides his own (I can't remember the name now, but I know that exist) but I want to restrict the file explorer to specific folders.

For now I have a function that fetch the root of Gdrive and I can build the three by recursive calling a function that list me the content of a single folder, but it is extremely slow and can potentially make thousand of request to google and this is unacceptable.

Here the function to get the root:

def drive_get_root():
    """Retrieve a root list of File resources.
       Returns:
         List of dictionaries.
    """
    
    #build the service, the driveHelper module will take care of authentication and credential storage
    drive_service = build('drive', 'v2', driveHelper.buildHttp())
    # the result will be a list
    result = []
    page_token = None
    while True:
        try:
            param = {}
            if page_token:
                param['pageToken'] = page_token
            files = drive_service.files().list(**param).execute()
            #add the files in the list
            result.extend(files['items'])
            page_token = files.get('nextPageToken')
            if not page_token:
                break
        except errors.HttpError, _error:
            print 'An error occurred: %s' % _error
        break
    return result

and here the one to get the file from a folder

def drive_files_in_folder(folder_id):
    """Print files belonging to a folder.
       Args:
         folder_id: ID of the folder to get files from.
    """
    #build the service, the driveHelper module will take care of authentication and credential storage
    drive_service = build('drive', 'v2', driveHelper.buildHttp())
    # the result will be a list
    result = []
    #code from google, is working so I didn't touch it
    page_token = None
    while True:
        try:
            param = {}

            if page_token:
                param['pageToken'] = page_token

            children = drive_service.children().list(folderId=folder_id, **param).execute()

            for child in children.get('items', []):
                result.append(drive_get_file(child['id']))

            page_token = children.get('nextPageToken')
            if not page_token:
                break
        except errors.HttpError, _error:
            print 'An error occurred: %s' % _error
            break       
    return result

and for example now to check if a file exist I'm using this:

def drive_path_exist(file_path, list = False):
    """
    This is a recursive function to che check if the given path exist
    """

    #if the list param is empty set the list as the root of Gdrive
    if list == False:
        list = drive_get_root()

    #split the string to get the first item and check if is in the root
    file_path = string.split(file_path, "/")

    #if there is only one element in the filepath we are at the actual filename
    #so if is in this folder we can return it
    if len(file_path) == 1:
        exist = False
        for elem in list:
            if elem["title"] == file_path[0]:
                #set exist = to the elem because the elem is a dictionary with all the file info
                exist = elem

        return exist
    #if we are not at the last element we have to keep searching
    else:
        exist = False
        for elem in list:
            #check if the current item is in the folder
            if elem["title"] == file_path[0]:
                exist = True
                folder_id = elem["id"]
                #delete the first element and keep searching
                file_path.pop(0)

        if exist:
            #recursive call, we have to rejoin the filpath as string an passing as list the list
            #from the drive_file_exist function
            return drive_path_exist("/".join(file_path), drive_files_in_folder(folder_id))

any idea how to solve my problem? I saw a few discussion here on overflow and in some answers people wrote that this is possible but of course the didn't said how!

Thanks

Upvotes: 12

Views: 16421

Answers (5)

Emilio Estrada
Emilio Estrada

Reputation: 11

I stumbled into this problem recently, because I had to verify the existence of many many files on Google Drive inside of an specified folder and its children.

I created a few classes to handle this:

driveServiceFetcher: Class that handles requests from Google, like getting ALL folders on Drive or fetching all files on specified folders. Nothing very different from what you have here (and based on some solutions I read from this thread)

from fileinput import filename
import os.path
from typing import final
from urllib import response

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from pyparsing import opAssoc

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']

class driveServiceFetcher:
    def __init__(self) -> None:
        self._credentials = self.getDriveCredentials()
        self.service = build('drive', 'v3', credentials=self._credentials)

    def getDriveCredentials(self):
        """Shows basic usage of the Drive v3 API.
        Prints the names and ids of the first 10 files the user has access to.
        """
        creds = None
        # The file token.json stores the user's access and refresh tokens, and is
        # created automatically when the authorization flow completes for the first
        # time.
        if os.path.exists('token.json'):
            creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    'credentials.json', SCOPES)
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open('token.json', 'w') as token:
                token.write(creds.to_json())
        return creds

    def get_all_folders_in_drive(self):
        driveFolders = []

        try:
            page_token = None
            max_allowed_page_size = 1000
            foldersQuery = "trashed = false and mimeType = 'application/vnd.google-apps.folder'"
            while True:
                results = self.service.files().list(
                    pageSize=max_allowed_page_size,
                    fields="nextPageToken, files(id, name, parents)",
                    includeItemsFromAllDrives=False, supportsAllDrives=False,
                    corpora='user',
                    ##driveId=DRIVE_ID, 
                    pageToken=page_token,
                    q=foldersQuery).execute()
                folders = results.get('files', [])
                page_token = results.get('nextPageToken', None)
                for folder in folders:
                    driveFolders.append(folder)
                if page_token is None:
                    break
        except:
            pass
        finally:
            return driveFolders

    def get_all_files_in_folders(self, parentFoldersIds):
        """
        Return a dictionary of file IDs mapped to file names for the specified parent folders.
        """
        files = []
        page_token = None
        max_allowed_page_size = 1000
        parentsQuery = buildAllParentsQuery(parentFoldersIds)
        filesQuery = f"mimeType != 'application/vnd.google-apps.folder' and trashed = false and ({parentsQuery})"
        while True:
            results = self.service.files().list(
                pageSize=max_allowed_page_size,
                fields="nextPageToken, files(id, name, mimeType, parents)",
                includeItemsFromAllDrives=False, supportsAllDrives=False,
                # corpora='drive',
                # driveId=DRIVE_ID,
                pageToken=page_token,
                q=filesQuery).execute()
            fetchedFiles = results.get('files', [])
            page_token = results.get('nextPageToken', None)
            for fetchedFile in fetchedFiles:
                files.append(fetchedFile)
            if page_token is None:
                break
        return files
    
def buildAllParentsQuery(parentIds):
    return ' in parents or '.join('"{0}"'.format(f) for f in parentIds) + ' in parents'

if __name__ == '__main__':
    pass

driveExplorer: Constructs the GoogleDrive hierarchy, so we can fetch all the folders once, and then we do not have to be constantly querying. I use this to get all the child nodes recursive for any folder I want, that way I can access their ids and used them in my serviceFetcher.get_all_files_in_folders()

import driveNode

class driveExplorer:
    def __init__(self, rootId, folderNodes:list[driveNode.driveNode]):
        ##ConstructNodes
        self.rootId = rootId
        self._flatHierarchy = folderNodes
        self._createTree()
        self._assignRootNode()
        if self.rootNode == None:
            raise DriveExplorerException
        self._optimizeFlattenedHierarchy()

    def _createTree(self):
        for n in self._flatHierarchy:
            parent = self.getNodeWithId(n.parentId)
            if parent != None:
                parent.children.append(n)
                n.parent = parent
        
    def _assignRootNode(self):
        self.rootNode = self.getNodeWithId(self.rootId)

    def _optimizeFlattenedHierarchy(self):
        print(f"Files in flat hierarchy PRE optimization: {len(self._flatHierarchy)}")
        rootChildren = []
        rootChildren.append(self.rootNode)
        rootChildren.extend(self.getAllChildrenNodes(self.rootNode))
        self._flatHierarchy = rootChildren
        print(f"Files in flat hierarchy POST optimization: {len(self._flatHierarchy)}")

    def getAllChildrenNodes(self, node:driveNode.driveNode):
        nodes = []
        for n in node.children:
            nodes.append(n)
            nodes.extend(self.getAllChildrenNodes(n))
        return nodes

    def getNodeWithId(self, nodeId):
        for n in self._flatHierarchy:
            if n.id == nodeId:
                return n
        return None
    
    ##TODO: Delete? Unused
    def getNodesWithParent(self, parentId): 
        nodes = list[driveNode.driveNode]
        for n in self._flatHierarchy:
            if n.parentId == parentId:
                nodes.append(n)
        return nodes

    def getNodesWithName(self, nodeName):
        nodes = []
        for n in self._flatHierarchy:
            if n.name == nodeName:
                nodes.append(n)
        return nodes
    
    def getDirectory(self, node:driveNode.driveNode):
        directory = [node]
        currentNode = node
        while currentNode.id != self.rootId:
            if currentNode.parent == None:
                break
            directory.insert(0, currentNode.parent)
            currentNode = currentNode.parent
        return directory

class DriveExplorerException(Exception):
    pass
            
def folderListToNodeList(folders):
    nodes = []
    for folder in folders:
        parents = folder.get('parents', None)
        if parents is not None:
            parent = parents[0]
        else:
            parent = None
        newNode = driveNode.driveNode(folder["name"],
        folder["id"],
        parent)
        nodes.append(newNode)
    return nodes

if __name__ == '__main__':
    pass

driveNode: A class with the Google Drive file info: Name, Id, parentId, array of Children(nodes) and the parent(node)

class driveNode:
    def __init__(self, name, id, parentId) -> None:
        self.name = name
        self.id = id
        self.parentId = parentId
        self.children = []
        self.parent = None

    

if __name__ == '__main__':
    pass

That way I can look for files (recursive) by doing something like this:

##ROOT_NODE_ID, the drive explorer requires a folder to be the "parent node"
driveFetcher = driveServiceFetcher()
allFolders = driveFetcher.get_all_folders_in_drive()
explorer = driveExplorer(ROOT_NODE_ID, folderListToNodeList(allFolders))

...

nodesToCrawl = explorer.getAllChildrenNodes(folder)
idsToCrawl = []
for n in nodesToCrawl:
    idsToCrawl.append(n.id)
files = driveFetcher.get_all_files_in_folders(idsToCrawl)

Upvotes: 1

pinoyyid
pinoyyid

Reputation: 22286

In order to build a representation of a tree in your app, you need to do this ...

  1. Run a Drive List query to retrieve all Folders
  2. Iterate the result array and examine the parents property to build an in-memory hierarchy
  3. Run a second Drive List query to get all non-folders (ie. files)
  4. For each file returned, place it in your in-memory tree

If you simply want to check if file-A exists in folder-B, the approach depends on whether the name "folder-B" is guaranteed to be unique.

If it's unique, just do a FilesList query for title='file-A', then do a Files Get for each of its parents and see if any of them are called 'folder-B'.

You don't say if these files and folders are being created by your app, or by the user with the Google Drive Webapp. If your app is the creator of these files/folders there is a trick you can use to restrict your searches to a single root. Say you have

MyDrive/app_root/folder-C/folder-B/file-A

you can make all of folder-C, folder-B and file-A children of app_root

That way you can constrain all of your queries to include

and 'app_root_id' in parents

NB. A previous version of this answer highlighted that Drive folders were not constrained to an inverted tree hierarchy, because a single folder could have multiple parents. As of 2021, this is no longer true and a Drive File (including Folders, which are simply special files) can only be created with a single parent.

Upvotes: 13

Rotem jackoby
Rotem jackoby

Reputation: 22058

I agree with @pinoyyid - Google drive is not a typical tree structure.

BUT, for printing the folder structure I would still consider using a tree visualization library (for example like treelib).

Below is a full solution for printing your google drive file system recursively.

from treelib import Node, Tree

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive

gauth = GoogleAuth()
gauth.LocalWebserverAuth()
drive = GoogleDrive(gauth)

### Helper functions ### 
def get_children(root_folder_id):
    str = "\'" + root_folder_id + "\'" + " in parents and trashed=false"
    file_list = drive.ListFile({'q': str}).GetList()
    return file_list

def get_folder_id(root_folder_id, root_folder_title):
    file_list = get_children(root_folder_id)
    for file in file_list:
        if(file['title'] == root_folder_title):
            return file['id']

def add_children_to_tree(tree, file_list, parent_id):
    for file in file_list:
        tree.create_node(file['title'], file['id'], parent=parent_id)
        print('parent: %s, title: %s, id: %s' % (parent_id, file['title'], file['id']))

### Recursion over all children ### 
def populate_tree_recursively(tree,parent_id):
    children = get_children(parent_id)
    add_children_to_tree(tree, children, parent_id)
    if(len(children) > 0):
        for child in children:
            populate_tree_recursively(tree, child['id'])


### Create tree and start populating from root ###
def main():
    root_folder_title = "your-root-folder"
    root_folder_id = get_folder_id("root", root_folder_title)

    tree = Tree()
    tree.create_node(root_folder_title, root_folder_id)
    populate_tree_recursively(tree, root_folder_id)
    tree.show()

if __name__ == "__main__":
    main()

Upvotes: 1

Zig Mandel
Zig Mandel

Reputation: 19835

Will never work like that except for very small trees. You have to rethink your entire algorithm for a cloud app (you have written it like a desktop app where you own the machine) since it will timeout easily. You need to mirror the tree beforehand (taskqueues and datastore) not just to avoid timeouts but also to avoid drive rate limits, and keep it in sync somehow (register for push etc). Not easy at all. Ive done a drive tree viewer before.

Upvotes: 2

idobatter
idobatter

Reputation: 99

An easy way to check if a file exist in a specific path is: drive_service.files().list(q="'THE_ID_OF_SPECIFIC_PATH' in parents and title='a file'").execute()

To walk all folders and files:

import sys, os
import socket

import googleDriveAccess

import logging
logging.basicConfig()

FOLDER_TYPE = 'application/vnd.google-apps.folder'

def getlist(ds, q, **kwargs):
  result = None
  npt = ''
  while not npt is None:
    if npt != '': kwargs['pageToken'] = npt
    entries = ds.files().list(q=q, **kwargs).execute()
    if result is None: result = entries
    else: result['items'] += entries['items']
    npt = entries.get('nextPageToken')
  return result

def uenc(u):
  if isinstance(u, unicode): return u.encode('utf-8')
  else: return u

def walk(ds, folderId, folderName, outf, depth):
  spc = ' ' * depth
  outf.write('%s+%s\n%s  %s\n' % (spc, uenc(folderId), spc, uenc(folderName)))
  q = "'%s' in parents and mimeType='%s'" % (folderId, FOLDER_TYPE)
  entries = getlist(ds, q, **{'maxResults': 200})
  for folder in entries['items']:
    walk(ds, folder['id'], folder['title'], outf, depth + 1)
  q = "'%s' in parents and mimeType!='%s'" % (folderId, FOLDER_TYPE)
  entries = getlist(ds, q, **{'maxResults': 200})
  for f in entries['items']:
    outf.write('%s -%s\n%s   %s\n' % (spc, uenc(f['id']), spc, uenc(f['title'])))

def main(basedir):
  da = googleDriveAccess.DAClient(basedir) # clientId=None, script=False
  f = open(os.path.join(basedir, 'hierarchy.txt'), 'wb')
  walk(da.drive_service, 'root', u'root', f, 0)
  f.close()

if __name__ == '__main__':
  logging.getLogger().setLevel(getattr(logging, 'INFO'))
  try:
    main(os.path.dirname(__file__))
  except (socket.gaierror, ), e:
    sys.stderr.write('socket.gaierror')

using googleDriveAccess github.com/HatsuneMiku/googleDriveAccess

Upvotes: 1

Related Questions