Anton Kozlov
Anton Kozlov

Reputation: 21

Youtube Data API nextPageToken loop in Python

I pieced this together from a number of different examples I found online.

The goal is to:

  1. search in the youtube api
  2. turn search results from multiple pages into a csv file

edit: heres a working example of the search loop thanks to one of the answers provided. This now loops the maximum amount of times (10) as intended however when executed the problem now is the CSV file

It seems that after response is called, the program finishes even though there is a call to results and writeCSV after.

Any further help would be greatly appreciated!

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import argparse

DEVELOPER_KEY = "dev-key"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)


# -------------Build YouTube Search------------#
def youtubeSearch(query, order="relevance"):
    # search 50 results per page
    request = youtube.search().list(
        q=query,
        type="video",
        order=order,
        part="id,snippet",
        maxResults="50",
        relevanceLanguage='en',
        videoDuration='long',
        fields='nextPageToken, items(id,snippet)'
    )

    title = []
    channelId = []
    channelTitle = []
    categoryId = []
    videoId = []
    viewCount = []
    likeCount = []
    dislikeCount = []
    commentCount = []
    favoriteCount = []
    tags = []
    category = []
    videos = []

    while request:
        response = request.execute()
        for search_result in response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":

                # append title and video for each item
                title.append(search_result['snippet']['title'])
                videoId.append(search_result['id']['videoId'])

                # then collect stats on each video using videoId
                stats = youtube.videos().list(
                    part='statistics, snippet',
                    id=search_result['id']['videoId']).execute()

                channelId.append(stats['items'][0]['snippet']['channelId'])
                channelTitle.append(stats['items'][0]['snippet']['channelTitle'])
                categoryId.append(stats['items'][0]['snippet']['categoryId'])
                favoriteCount.append(stats['items'][0]['statistics']['favoriteCount'])
                viewCount.append(stats['items'][0]['statistics']['viewCount'])

                # Not every video has likes/dislikes enabled so they won't appear in JSON response
                try:
                    likeCount.append(stats['items'][0]['statistics']['likeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Likes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    # Appends "Not Available" to keep dictionary values aligned
                    likeCount.append("Not available")

                try:
                    dislikeCount.append(stats['items'][0]['statistics']['dislikeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Dislikes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    dislikeCount.append("Not available")

                # Sometimes comments are disabled so if they exist append, if not append nothing...
                # It's not uncommon to disable comments, so no need to wrap in try and except
                if 'commentCount' in stats['items'][0]['statistics'].keys():
                    commentCount.append(stats['items'][0]['statistics']['commentCount'])
                else:
                    commentCount.append(0)

                if 'tags' in stats['items'][0]['snippet'].keys():
                    tags.append(stats['items'][0]['snippet']['tags'])
                else:
                    # I'm not a fan of empty fields
                    tags.append("No Tags")
        request = youtube.search().list_next(
            request, response)
    # Break out of for-loop and if statement and store lists of values in dictionary
    youtube_dict = {'tags': tags, 'channelId': channelId, 'channelTitle': channelTitle,
                    'categoryId': categoryId, 'title': title, 'videoId': videoId,
                    'viewCount': viewCount, 'likeCount': likeCount, 'dislikeCount': dislikeCount,
                    'commentCount': commentCount, 'favoriteCount': favoriteCount}


    print("Search Completed...")
    print("Total results: {0} \nResults per page: {1}".format(request['pageInfo']['totalResults'],
                                                              request['pageInfo']['resultsPerPage']))
    print("Example output per item, snippet")
    print(request['items'][0]['snippet'].keys())
    # Assign first page of results (items) to item variable
    items = request['items']  # 50 "items"

    # Assign 1st results to title, channelId, datePublished then print
    title = items[0]['snippet']['title']
    channelId = items[0]['snippet']['channelId']
    datePublished = items[0]['snippet']['publishedAt']
    print("First result is: \n Title: {0} \n Channel ID: {1} \n Published on: {2}".format(title, channelId,
                                                                                          datePublished))
    return youtube_dict


# Input query
print("Please input your search query")
q = input()
# Run YouTube Search
results = youtubeSearch(q)
# Display result titles
print("Top 3 results are: \n {0}, ({1}), \n {2}, ({3}),\n {4}, ({5})".format(results['title'][0],
                                                                             results['channelTitle'][0],
                                                                             results['title'][1],
                                                                             results['channelTitle'][1],
                                                                             results['title'][2],
                                                                             results['channelTitle'][2]))

# -------------------------Save results------------------------------#
print("Input filename to store csv file")
file = "\\YouTube\\" + input() + ".csv"


def writeCSV(results, filename):
    import csv
    keys = sorted(results.keys())
    with open(filename, "w", newline="", encoding="utf-8") as output:
        writer = csv.writer(output, delimiter=",")
        writer.writerow(keys)
        writer.writerows(zip(*[results[key] for key in keys]))


writeCSV(results, file)
print("CSV file has been uploaded at: " + str(file))

Upvotes: 2

Views: 1836

Answers (2)

Anton Kozlov
Anton Kozlov

Reputation: 21

I worked out a solution after a few different tests. I wasn't able to implement the pythonic solution suggested but this worked for me.

import pandas as pd
import os
import webvtt
import csv

import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow

CLIENT_SECRETS_FILE = "client_secrets.json"
SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'

def get_authenticated_service():
    flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
    credentials = flow.run_console()
    return build(API_SERVICE_NAME, API_VERSION, credentials= credentials)

# Remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
    good_kwargs = {}
    if kwargs is not None:
        for key, value in kwargs.items():
            if value:
                good_kwargs[key] = value
    return good_kwargs

client = get_authenticated_service()

def youtube_keyword(client, **kwargs):
    kwargs = remove_empty_kwargs(**kwargs)
    response = client.search().list(
        **kwargs
        ).execute()
    return response

def youtube_search(criteria, max_res):
    # create lists and empty dataframe
    titles = []
    videoIds = []
    channelIds = []
    resp_df = pd.DataFrame()

    while len(titles) < max_res:
        token = None
        response = youtube_keyword(client,
                                   part='id,snippet',
                                   maxResults=50,
                                   q=criteria,
                                   videoCaption='closedCaption',
                                   type='video',
                                   videoDuration='long',
                                   pageToken=token)

        for item in response['items']:
            titles.append(item['snippet']['title'])
            channelIds.append(item['snippet']['channelTitle'])
            videoIds.append(item['id']['videoId'])

        token = response["nextPageToken"]

    resp_df['title'] = titles
    resp_df['channelId'] = channelIds
    resp_df['videoId'] = videoIds
    resp_df['subject'] = criteria

    return resp_df

Found_Videos = youtube_search('[search criteria]',1000)
Found_Videos.shape

Found_Videos.head()
Found_Videos.to_csv('Found_Videos.csv')

Upvotes: 0

stvar
stvar

Reputation: 6975

Since you're using the Google's APIs Client Library for Python, the pythonic way of implementing result set pagination on the Search.list API endpoint looks like the one below:

request = youtube.search().list(
    q = 'A query',
    part = 'id,snippet',
    type = 'video',
    maxResults = 50,
    relevanceLanguage = 'en',
    videoDuration = 'long'
)

while request:
    response = request.execute()

    for item in response['items']:
        ...

    request = youtube.search().list_next(
        request, response)

It is this simple due to the way the Python client library is implemented: there's no need to handle explicitly the API response object's property nextPageToken and the API request parameter pageToken at all.

Upvotes: 4

Related Questions