Reputation: 11
I have a Python script that processes a batch of CSV files in parallel. Each input CSV has exactly 200 rows, so I use an if num_videos == 200
: check to run a function like:
if num_videos == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
Inside process_videos_in_batches
, I add debug lines:
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")
And inside save_all_data_to_csv
:
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print("Debug: writing CSV to", output_csv_path)
For some of the output CSV files, I see the debug prints (and the file contains many rows, as expected). However, for other files, the CSV is created but ends up empty. Even more strangely, the debug prints for these “empty” files never appear—so it looks like the code isn’t running that section fully, yet a CSV still appears. Furthermore, it seems unlikely that the process inside process_videos_in_batches
would result in a CSV with 0 rows, but in reality, some output CSV files have 0 rows (the full code of the main function and parts of the outputs are provided at the end). I have confirmed that files where the lines
print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")
are correctly printed have content, whereas files where these lines are not printed are empty.
if
check should pass)My environment is:
Why might some files run the full code path (with debug prints and data in the CSV) while others apparently skip the debug prints and produce an empty CSV—despite the if
condition being satisfied for all files? Could something in my parallel execution be causing the function to exit early or fail silently for certain files, even though the CSV is still created?
Any tips or insights would be greatly appreciated. Thanks in advance!
--- the full code of main function ---
process_videos_in_batches
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
all_data = []
all_video_ids = video_id_list[:]
while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]
videos_info = get_video_data(batch_video_ids, API_KEY)
### Debug ###
print(f'Debug: {videos_info}, {batch_video_ids}')
#############
# videos_infoがNoneの場合は次のバッチへ
if videos_info is None:
continue
for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
# 親コメントの情報を展開
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
### Debug ###
print(f'Debug: {len(all_data)}')
print(f'Debug: {all_data[:1]}')
#############
return all_data
save_all_data_to_csv
def save_all_data_to_csv(all_data, output_csv_path):
# DataFrame に変換
final_df = pd.DataFrame(all_data)
# CSV に保存
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
### Debug ###
print("Debug: writing CSV to", output_csv_path)
#############
process_single_file
def process_single_file(file_path, output_dir, batch_size, API_KEY):
"""
1つのCSVファイルを処理し、必要に応じて結果をCSVで出力する。
"""
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
# 終わったらファイルパスを返す(ログ代わり)
return file_path
The main process:
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]
# 並列実行する
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
# process_single_file関数を並列に実行
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)
# 処理が終わったら順に結果を取得
# tqdmを使って「終わった数」を確認するときは、as_completedに対して進捗バーを回します
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)
Outputs:
Debug: {'lAtasG8EVEg': {'video_id': 'lAtasG8EVEg'...
...
Debug: 22058
Debug: [{'video_id': '8BtA6fO93_w'...
Debug: writing CSV to /home/foo/mnt/vt/related_videos/data_info/C_dsxOR9JJw.csv
Minimal Reproducible Example
import os
import time
from concurrent.futures import ProcessPoolExecutor
def example_task(x):
time.sleep(1) # Simulate some work
return x ** 2
def process_files(files, num_workers):
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(example_task, files))
return results
if __name__ == "__main__":
# Input data to simulate files
files = [1, 2, 3, 4, 5]
# Test with multiprocessing
num_workers = 4 # Adjust the number of workers
try:
output = process_files(files, num_workers)
print("Output:", output)
except Exception as e:
print("Error:", e)
Full Code
import os
import requests
import pandas as pd
import time
from tqdm.notebook import tqdm
from datetime import datetime, timedelta, timezone
from concurrent.futures import ProcessPoolExecutor, as_completed
# Retrieve API key and paths from environment variables
API_KEY = os.getenv('YOUTUBE_API_KEY')
batch_size = 50
mnt_path = os.getenv('MNT_PATH')
root_dir = os.path.join(mnt_path, 'related_videos')
# Create directories for thumbnails and output data
thumbnail_dir = os.path.join(root_dir, 'thumbnails')
os.makedirs(thumbnail_dir, exist_ok=True)
output_dir = os.path.join(root_dir, 'data_info')
os.makedirs(output_dir, exist_ok=True)
# Check if the output directory exists
if not os.path.exists(output_dir):
print(f"Failed to create directory: {output_dir}")
# Print the current time in JST (Japan Standard Time)
def show_now_time():
jst = timezone(timedelta(hours=9))
jst_time = datetime.now(jst)
print(jst_time)
# Convert UTC time string to JST time string
def convert_to_jst(utc_time_str):
utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%SZ")
jst_time = utc_time + timedelta(hours=9)
return jst_time.strftime("%Y-%m-%d %H:%M:%S")
# Download a video thumbnail and save it locally
def download_thumbnail(thumbnail_url, thumbnail_dir, video_id):
try:
response = requests.get(thumbnail_url)
response.raise_for_status() # Raise an exception for HTTP errors
except requests.exceptions.RequestException as e:
print(f"Error occurred while downloading thumbnail: {e}")
return None
if response.ok:
# Extract the file extension (e.g., jpg, png)
file_extension = thumbnail_url.split('.')[-1]
if len(file_extension) > 5: # Handle overly long extensions (e.g., URL parameters)
file_extension = 'jpg' # Default to jpg
# Construct the file path for saving
file_path = os.path.join(thumbnail_dir, f"{video_id}.{file_extension}")
try:
# Save the image data to the file
with open(file_path, 'wb') as file:
file.write(response.content)
return file_path
except IOError as io_error:
print(f"Error occurred while saving the file: {io_error}")
return None
else:
print(f"Unexpected status code received: {response.status_code}")
return None
# Retrieve all comments from a specific video
def get_all_video_comments(video_id, API_KEY):
comment_url = 'https://www.googleapis.com/youtube/v3/commentThreads'
comments = []
page_token = None
max_retries = 3
# Default value when no comments are available
empty_comment = {
'comment': '0',
'comment_like_count': 0,
'comment_author_channel_id': '0',
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': 'yes'
}
# Default value for disabled comment sections
disabled_comment = {
'comment': pd.NA,
'comment_like_count': pd.NA,
'comment_author_channel_id': pd.NA,
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': 'no'
}
# Default value for inaccessible comments
delete_comment = {
'comment': pd.NA,
'comment_like_count': pd.NA,
'comment_author_channel_id': pd.NA,
'comment_published_at': pd.NA,
'updated_at': pd.NA,
'is_comment_available': pd.NA
}
# Handle API errors and decide on appropriate actions
def handle_api_error(response):
try:
error_response = response.json()
error_info = error_response.get('error', {})
error_reason = ''
if 'errors' in error_info and error_info['errors']:
error_reason = error_info['errors'][0].get('reason', '')
except ValueError:
# Retry if JSON decoding fails
return 'retry'
status = response.status_code
# Handle specific error codes and reasons
if status == 403:
if error_reason == 'commentsDisabled':
return 'disabled'
elif error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
print(f"API quota exceeded. Waiting for 24 hours...")
show_now_time()
time.sleep(86400)
return 'retry'
elif error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
print(f"Rate limit exceeded. Waiting for 2 minutes...")
show_now_time()
time.sleep(120)
return 'retry'
else:
return 'abort'
elif status == 404:
return 'not_found'
elif status == 429:
print("Too many requests. Waiting for 70 seconds...")
show_now_time()
time.sleep(70)
return 'retry'
else:
print(f"Unexpected API error: {status}, reason: {error_reason}")
time.sleep(70)
return 'retry'
while True:
comment_params = {
'part': 'snippet,replies',
'videoId': video_id,
'key': API_KEY,
'maxResults': 100,
'pageToken': page_token
}
for attempt in range(max_retries):
comment_response = requests.get(comment_url, params=comment_params)
if comment_response.status_code == 200:
# Parse the JSON response
comment_data = comment_response.json()
break
else:
# Handle the error based on its type
error_action = handle_api_error(comment_response)
if error_action == 'retry':
continue
elif error_action == 'disabled':
return [disabled_comment]
elif error_action == 'not_found':
return [delete_comment]
elif error_action == 'abort':
return [delete_comment]
else:
# If retries fail, return a default response
print(f"Maximum retries reached.")
return [delete_comment]
# Extract comments and their replies
if 'items' in comment_data and comment_data['items']:
for item in comment_data['items']:
parent_comment = item['snippet']['topLevelComment']['snippet']
comment_entry = {
'comment': parent_comment['textDisplay'],
'comment_like_count': parent_comment['likeCount'],
'comment_author_channel_id': parent_comment.get('authorChannelId', {}).get('value'),
'comment_published_at': parent_comment['publishedAt'],
'updated_at': parent_comment.get('updatedAt', None),
'is_comment_available': 'yes',
'has_reply': 'no',
'reply_comment': pd.NA,
'reply_like_count': pd.NA,
'reply_author_channel_id': pd.NA,
'reply_published_at': pd.NA,
'reply_updated_at': pd.NA
}
# Process replies if available
if 'replies' in item:
for reply in item['replies']['comments']:
reply_snippet = reply['snippet']
comment_entry.update({
'reply_comment': reply_snippet['textDisplay'],
'reply_like_count': reply_snippet['likeCount'],
'reply_author_channel_id': reply_snippet.get('authorChannelId', {}).get('value'),
'reply_published_at': reply_snippet['publishedAt'],
'reply_updated_at': reply_snippet.get('updatedAt', None),
'has_reply': 'yes'
})
comments.append(comment_entry.copy())
else:
comments.append(comment_entry)
else:
# Return a default response if no comments are retrieved
return [empty_comment]
# Fetch the next page of comments if available
page_token = comment_data.get('nextPageToken')
if not page_token:
break
time.sleep(3)
# Return a default value if no comments were retrieved
if not comments:
return [empty_comment]
return comments
# Retrieve metadata for a batch of video IDs
def get_video_data(video_ids, API_KEY):
video_url = 'https://www.googleapis.com/youtube/v3/videos'
max_retries = 3
videos_info = {}
# Process video IDs in batches of 50
for start in range(0, len(video_ids), 50):
batch_video_ids = video_ids[start:start+50]
video_params = {
'part': 'snippet,statistics',
'id': ','.join(batch_video_ids),
'key': API_KEY
}
for attempt in range(max_retries):
try:
video_response = requests.get(video_url, params=video_params)
if video_response.status_code == 200:
video_data = video_response.json()
fetched_ids = set()
if 'items' in video_data:
for item in video_data['items']:
video_id = item.get('id', None)
fetched_ids.add(video_id)
# Safely retrieve snippet and statistics
snippet = item.get('snippet', {})
statistics = item.get('statistics', {})
title = snippet.get('title', '')
description = snippet.get('description', '')
like_count = int(statistics.get('likeCount', 0))
view_count = int(statistics.get('viewCount', 0))
comment_count = int(statistics.get('commentCount', 0))
published_at = convert_to_jst(snippet.get('publishedAt', ''))
channel_id = snippet.get('channelId', '')
channel_title = snippet.get('channelTitle', '')
thumbnail_url = snippet.get('thumbnails', {}).get('default', {}).get('url', pd.NA)
# Download the thumbnail if available
if pd.notna(thumbnail_url):
thumbnail_path = download_thumbnail(thumbnail_url, thumbnail_dir, video_id)
else:
thumbnail_path = None
# Store video information
videos_info[video_id] = {
'video_id': video_id,
'title': title,
'description': description,
'like_count': like_count,
'view_count': view_count,
'comment_count': comment_count,
'published_at': published_at,
'channel_id': channel_id,
'channel_title': channel_title,
'thumbnail_url': thumbnail_url,
'thumbnail_path': thumbnail_path,
}
# Handle missing video IDs
missing_ids = set(batch_video_ids) - fetched_ids
for mid in missing_ids:
videos_info[mid] = None
break
else:
# Parse error response and handle specific cases
error_response = video_response.json()
error_info = error_response.get('error', {})
error_reason = ''
if 'errors' in error_info and error_info['errors']:
error_reason = error_info['errors'][0].get('reason', '')
if video_response.status_code == 403 and error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
print("API quota exceeded. Waiting for 24 hours...")
show_now_time()
time.sleep(86400)
continue
elif video_response.status_code == 403 and error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
print("Rate limit exceeded. Waiting for 2 minutes...")
time.sleep(120)
continue
elif video_response.status_code == 404:
print(f"Video not found or invalid resource: batch={batch_video_ids}")
break
else:
print(f"Unexpected API error: {video_response.status_code}, reason: {error_reason}")
time.sleep(120)
continue
except ValueError:
print("Failed to decode API response JSON. Retrying...")
time.sleep(70)
continue
return videos_info if videos_info else None
# Process videos in batches and collect all data
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
all_data = []
all_video_ids = video_id_list[:]
while all_video_ids:
batch_video_ids = all_video_ids[:batch_size]
all_video_ids = all_video_ids[batch_size:]
videos_info = get_video_data(batch_video_ids, API_KEY)
if videos_info is None:
continue
for video_id in batch_video_ids:
video_data = videos_info.get(video_id) if videos_info else None
if video_data:
comments = get_all_video_comments(video_id, API_KEY)
for comment in comments:
video_entry = video_data.copy()
video_entry.update({
'comment': comment.get('comment', pd.NA),
'comment_like_count': comment.get('comment_like_count', pd.NA),
'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
'comment_published_at': comment.get('comment_published_at', pd.NA),
'updated_at': comment.get('updated_at', pd.NA),
'has_reply': comment.get('has_reply', 'no'),
'reply_comment': comment.get('reply_comment', pd.NA),
'reply_like_count': comment.get('reply_like_count', pd.NA),
'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
'reply_published_at': comment.get('reply_published_at', pd.NA),
'reply_updated_at': comment.get('reply_updated_at', pd.NA)
})
all_data.append(video_entry)
return all_data
# Save the collected data to a CSV file
def save_all_data_to_csv(all_data, output_csv_path):
final_df = pd.DataFrame(all_data)
final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
# Process a single CSV file and save the results
def process_single_file(file_path, output_dir, batch_size, API_KEY):
df = pd.read_csv(file_path, encoding='utf-8')
video_ids = list(df['related_video_id'])
num_video = len(video_ids)
if num_video == 200:
all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
save_all_data_to_csv(all_data, output_csv_path)
return file_path
# List input files and prepare for parallel processing
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]
# Execute tasks in parallel
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_path in file_paths:
future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
futures.append(future)
for f in tqdm(as_completed(futures)):
try:
finished_file = f.result()
except Exception as e:
print("Error happened in worker:", e)
Upvotes: 0
Views: 78