Reputation: 21
I pieced this together from a number of different examples I found online.
The goal is to:
edit: heres a working example of the search loop thanks to one of the answers provided. This now loops the maximum amount of times (10) as intended however when executed the problem now is the CSV file
It seems that after response is called, the program finishes even though there is a call to results
and writeCSV
after.
Any further help would be greatly appreciated!
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import argparse
DEVELOPER_KEY = "dev-key"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)
# -------------Build YouTube Search------------#
def youtubeSearch(query, order="relevance"):
# search 50 results per page
request = youtube.search().list(
q=query,
type="video",
order=order,
part="id,snippet",
maxResults="50",
relevanceLanguage='en',
videoDuration='long',
fields='nextPageToken, items(id,snippet)'
)
title = []
channelId = []
channelTitle = []
categoryId = []
videoId = []
viewCount = []
likeCount = []
dislikeCount = []
commentCount = []
favoriteCount = []
tags = []
category = []
videos = []
while request:
response = request.execute()
for search_result in response.get("items", []):
if search_result["id"]["kind"] == "youtube#video":
# append title and video for each item
title.append(search_result['snippet']['title'])
videoId.append(search_result['id']['videoId'])
# then collect stats on each video using videoId
stats = youtube.videos().list(
part='statistics, snippet',
id=search_result['id']['videoId']).execute()
channelId.append(stats['items'][0]['snippet']['channelId'])
channelTitle.append(stats['items'][0]['snippet']['channelTitle'])
categoryId.append(stats['items'][0]['snippet']['categoryId'])
favoriteCount.append(stats['items'][0]['statistics']['favoriteCount'])
viewCount.append(stats['items'][0]['statistics']['viewCount'])
# Not every video has likes/dislikes enabled so they won't appear in JSON response
try:
likeCount.append(stats['items'][0]['statistics']['likeCount'])
except:
# Good to be aware of Channels that turn off their Likes
print("Video titled {0}, on Channel {1} Likes Count is not available".format(
stats['items'][0]['snippet']['title'],
stats['items'][0]['snippet']['channelTitle']))
print(stats['items'][0]['statistics'].keys())
# Appends "Not Available" to keep dictionary values aligned
likeCount.append("Not available")
try:
dislikeCount.append(stats['items'][0]['statistics']['dislikeCount'])
except:
# Good to be aware of Channels that turn off their Likes
print("Video titled {0}, on Channel {1} Dislikes Count is not available".format(
stats['items'][0]['snippet']['title'],
stats['items'][0]['snippet']['channelTitle']))
print(stats['items'][0]['statistics'].keys())
dislikeCount.append("Not available")
# Sometimes comments are disabled so if they exist append, if not append nothing...
# It's not uncommon to disable comments, so no need to wrap in try and except
if 'commentCount' in stats['items'][0]['statistics'].keys():
commentCount.append(stats['items'][0]['statistics']['commentCount'])
else:
commentCount.append(0)
if 'tags' in stats['items'][0]['snippet'].keys():
tags.append(stats['items'][0]['snippet']['tags'])
else:
# I'm not a fan of empty fields
tags.append("No Tags")
request = youtube.search().list_next(
request, response)
# Break out of for-loop and if statement and store lists of values in dictionary
youtube_dict = {'tags': tags, 'channelId': channelId, 'channelTitle': channelTitle,
'categoryId': categoryId, 'title': title, 'videoId': videoId,
'viewCount': viewCount, 'likeCount': likeCount, 'dislikeCount': dislikeCount,
'commentCount': commentCount, 'favoriteCount': favoriteCount}
print("Search Completed...")
print("Total results: {0} \nResults per page: {1}".format(request['pageInfo']['totalResults'],
request['pageInfo']['resultsPerPage']))
print("Example output per item, snippet")
print(request['items'][0]['snippet'].keys())
# Assign first page of results (items) to item variable
items = request['items'] # 50 "items"
# Assign 1st results to title, channelId, datePublished then print
title = items[0]['snippet']['title']
channelId = items[0]['snippet']['channelId']
datePublished = items[0]['snippet']['publishedAt']
print("First result is: \n Title: {0} \n Channel ID: {1} \n Published on: {2}".format(title, channelId,
datePublished))
return youtube_dict
# Input query
print("Please input your search query")
q = input()
# Run YouTube Search
results = youtubeSearch(q)
# Display result titles
print("Top 3 results are: \n {0}, ({1}), \n {2}, ({3}),\n {4}, ({5})".format(results['title'][0],
results['channelTitle'][0],
results['title'][1],
results['channelTitle'][1],
results['title'][2],
results['channelTitle'][2]))
# -------------------------Save results------------------------------#
print("Input filename to store csv file")
file = "\\YouTube\\" + input() + ".csv"
def writeCSV(results, filename):
import csv
keys = sorted(results.keys())
with open(filename, "w", newline="", encoding="utf-8") as output:
writer = csv.writer(output, delimiter=",")
writer.writerow(keys)
writer.writerows(zip(*[results[key] for key in keys]))
writeCSV(results, file)
print("CSV file has been uploaded at: " + str(file))
Upvotes: 2
Views: 1836
Reputation: 21
I worked out a solution after a few different tests. I wasn't able to implement the pythonic solution suggested but this worked for me.
import pandas as pd
import os
import webvtt
import csv
import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow
CLIENT_SECRETS_FILE = "client_secrets.json"
SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'
def get_authenticated_service():
flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
credentials = flow.run_console()
return build(API_SERVICE_NAME, API_VERSION, credentials= credentials)
# Remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
good_kwargs = {}
if kwargs is not None:
for key, value in kwargs.items():
if value:
good_kwargs[key] = value
return good_kwargs
client = get_authenticated_service()
def youtube_keyword(client, **kwargs):
kwargs = remove_empty_kwargs(**kwargs)
response = client.search().list(
**kwargs
).execute()
return response
def youtube_search(criteria, max_res):
# create lists and empty dataframe
titles = []
videoIds = []
channelIds = []
resp_df = pd.DataFrame()
while len(titles) < max_res:
token = None
response = youtube_keyword(client,
part='id,snippet',
maxResults=50,
q=criteria,
videoCaption='closedCaption',
type='video',
videoDuration='long',
pageToken=token)
for item in response['items']:
titles.append(item['snippet']['title'])
channelIds.append(item['snippet']['channelTitle'])
videoIds.append(item['id']['videoId'])
token = response["nextPageToken"]
resp_df['title'] = titles
resp_df['channelId'] = channelIds
resp_df['videoId'] = videoIds
resp_df['subject'] = criteria
return resp_df
Found_Videos = youtube_search('[search criteria]',1000)
Found_Videos.shape
Found_Videos.head()
Found_Videos.to_csv('Found_Videos.csv')
Upvotes: 0
Reputation: 6975
Since you're using the Google's APIs Client Library for Python, the pythonic way of implementing result set pagination on the Search.list
API endpoint looks like the one below:
request = youtube.search().list(
q = 'A query',
part = 'id,snippet',
type = 'video',
maxResults = 50,
relevanceLanguage = 'en',
videoDuration = 'long'
)
while request:
response = request.execute()
for item in response['items']:
...
request = youtube.search().list_next(
request, response)
It is this simple due to the way the Python client library is implemented: there's no need to handle explicitly the API response object's property nextPageToken
and the API request parameter pageToken
at all.
Upvotes: 4