ララララ
ララララ

Reputation: 11

Some CSV outputs are empty in parallel Python processing, even though the code path should be triggered

I have a Python script that processes a batch of CSV files in parallel. Each input CSV has exactly 200 rows, so I use an if num_videos == 200: check to run a function like:

if num_videos == 200:
    all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
    output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
    save_all_data_to_csv(all_data, output_csv_path)

Inside process_videos_in_batches, I add debug lines:

print(f"Debug: {len(all_data)}")
print(f"Debug: {all_data[:1]}")

And inside save_all_data_to_csv:

final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print("Debug: writing CSV to", output_csv_path)

For some of the output CSV files, I see the debug prints (and the file contains many rows, as expected). However, for other files, the CSV is created but ends up empty. Even more strangely, the debug prints for these “empty” files never appear—so it looks like the code isn’t running that section fully, yet a CSV still appears. Furthermore, it seems unlikely that the process inside process_videos_in_batches would result in a CSV with 0 rows, but in reality, some output CSV files have 0 rows (the full code of the main function and parts of the outputs are provided at the end). I have confirmed that files where the lines

print(f"Debug: {len(all_data)}")  
print(f"Debug: {all_data[:1]}") 

are correctly printed have content, whereas files where these lines are not printed are empty.

My environment is:

Why might some files run the full code path (with debug prints and data in the CSV) while others apparently skip the debug prints and produce an empty CSV—despite the if condition being satisfied for all files? Could something in my parallel execution be causing the function to exit early or fail silently for certain files, even though the CSV is still created?

Any tips or insights would be greatly appreciated. Thanks in advance!

--- the full code of main function ---

process_videos_in_batches

def process_videos_in_batches(video_id_list, batch_size, API_KEY):

    all_data = []
    
    all_video_ids = video_id_list[:]


    while all_video_ids:
        batch_video_ids = all_video_ids[:batch_size]
        all_video_ids = all_video_ids[batch_size:]

        videos_info = get_video_data(batch_video_ids, API_KEY)
        
        ### Debug ###
        print(f'Debug: {videos_info}, {batch_video_ids}')
        #############
        
        # videos_infoがNoneの場合は次のバッチへ
        if videos_info is None:
            continue

        for video_id in batch_video_ids:
            video_data = videos_info.get(video_id) if videos_info else None
            if video_data:
                comments = get_all_video_comments(video_id, API_KEY)
                for comment in comments:
                    # 親コメントの情報を展開
                    video_entry = video_data.copy()
                    video_entry.update({
                        'comment': comment.get('comment', pd.NA),
                        'comment_like_count': comment.get('comment_like_count', pd.NA),
                        'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
                        'comment_published_at': comment.get('comment_published_at', pd.NA),
                        'updated_at': comment.get('updated_at', pd.NA),
                        'has_reply': comment.get('has_reply', 'no'),
                        'reply_comment': comment.get('reply_comment', pd.NA),
                        'reply_like_count': comment.get('reply_like_count', pd.NA),
                        'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
                        'reply_published_at': comment.get('reply_published_at', pd.NA),
                        'reply_updated_at': comment.get('reply_updated_at', pd.NA)
                    })
                    all_data.append(video_entry)
    ### Debug ###
    print(f'Debug: {len(all_data)}')
    print(f'Debug: {all_data[:1]}')
    #############
    return all_data

save_all_data_to_csv

def save_all_data_to_csv(all_data, output_csv_path):
    # DataFrame に変換
    final_df = pd.DataFrame(all_data)

    # CSV に保存
    final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
    ### Debug ###
    print("Debug: writing CSV to", output_csv_path)
    #############

process_single_file

def process_single_file(file_path, output_dir, batch_size, API_KEY):
    """
    1つのCSVファイルを処理し、必要に応じて結果をCSVで出力する。
    """
    df = pd.read_csv(file_path, encoding='utf-8')
    video_ids = list(df['related_video_id'])
    num_video = len(video_ids)
    if num_video == 200:
        all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
        output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
        save_all_data_to_csv(all_data, output_csv_path)
    
    # 終わったらファイルパスを返す(ログ代わり)
    return file_path

The main process:

input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]

# 並列実行する
num_workers = 4

futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
    for file_path in file_paths:
        # process_single_file関数を並列に実行
        future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
        futures.append(future)

    # 処理が終わったら順に結果を取得
    # tqdmを使って「終わった数」を確認するときは、as_completedに対して進捗バーを回します
    for f in tqdm(as_completed(futures)):
        try:
            finished_file = f.result()
        except Exception as e:
            print("Error happened in worker:", e)

Outputs:

Debug: {'lAtasG8EVEg': {'video_id': 'lAtasG8EVEg'...
...
Debug: 22058
Debug: [{'video_id': '8BtA6fO93_w'...
Debug: writing CSV to /home/foo/mnt/vt/related_videos/data_info/C_dsxOR9JJw.csv

Minimal Reproducible Example

import os
import time
from concurrent.futures import ProcessPoolExecutor

def example_task(x):
    time.sleep(1)  # Simulate some work
    return x ** 2

def process_files(files, num_workers):
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = list(executor.map(example_task, files))
    return results

if __name__ == "__main__":
    # Input data to simulate files
    files = [1, 2, 3, 4, 5]

    # Test with multiprocessing
    num_workers = 4  # Adjust the number of workers
    try:
        output = process_files(files, num_workers)
        print("Output:", output)
    except Exception as e:
        print("Error:", e)

Full Code

import os
import requests
import pandas as pd
import time
from tqdm.notebook import tqdm
from datetime import datetime, timedelta, timezone
from concurrent.futures import ProcessPoolExecutor, as_completed

# Retrieve API key and paths from environment variables
API_KEY = os.getenv('YOUTUBE_API_KEY')
batch_size = 50
mnt_path = os.getenv('MNT_PATH')
root_dir = os.path.join(mnt_path, 'related_videos')

# Create directories for thumbnails and output data
thumbnail_dir = os.path.join(root_dir, 'thumbnails')
os.makedirs(thumbnail_dir, exist_ok=True)
output_dir = os.path.join(root_dir, 'data_info')
os.makedirs(output_dir, exist_ok=True)

# Check if the output directory exists
if not os.path.exists(output_dir):
    print(f"Failed to create directory: {output_dir}")

# Print the current time in JST (Japan Standard Time)
def show_now_time():
    jst = timezone(timedelta(hours=9))
    jst_time = datetime.now(jst)
    print(jst_time)

# Convert UTC time string to JST time string
def convert_to_jst(utc_time_str):
    utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%SZ")
    jst_time = utc_time + timedelta(hours=9)
    return jst_time.strftime("%Y-%m-%d %H:%M:%S")

# Download a video thumbnail and save it locally
def download_thumbnail(thumbnail_url, thumbnail_dir, video_id):
    try:
        response = requests.get(thumbnail_url)
        response.raise_for_status()  # Raise an exception for HTTP errors
    except requests.exceptions.RequestException as e:
        print(f"Error occurred while downloading thumbnail: {e}")
        return None

    if response.ok:
        # Extract the file extension (e.g., jpg, png)
        file_extension = thumbnail_url.split('.')[-1]
        if len(file_extension) > 5:  # Handle overly long extensions (e.g., URL parameters)
            file_extension = 'jpg'  # Default to jpg

        # Construct the file path for saving
        file_path = os.path.join(thumbnail_dir, f"{video_id}.{file_extension}")
        try:
            # Save the image data to the file
            with open(file_path, 'wb') as file:
                file.write(response.content)
            return file_path
        except IOError as io_error:
            print(f"Error occurred while saving the file: {io_error}")
            return None
    else:
        print(f"Unexpected status code received: {response.status_code}")
        return None

# Retrieve all comments from a specific video
def get_all_video_comments(video_id, API_KEY):
    comment_url = 'https://www.googleapis.com/youtube/v3/commentThreads'
    comments = []
    page_token = None
    max_retries = 3

    # Default value when no comments are available
    empty_comment = {
        'comment': '0',
        'comment_like_count': 0,
        'comment_author_channel_id': '0',
        'comment_published_at': pd.NA,
        'updated_at': pd.NA,
        'is_comment_available': 'yes'
    }

    # Default value for disabled comment sections
    disabled_comment = {
        'comment': pd.NA,
        'comment_like_count': pd.NA,
        'comment_author_channel_id': pd.NA,
        'comment_published_at': pd.NA,
        'updated_at': pd.NA,
        'is_comment_available': 'no'
    }

    # Default value for inaccessible comments
    delete_comment = {
        'comment': pd.NA,
        'comment_like_count': pd.NA,
        'comment_author_channel_id': pd.NA,
        'comment_published_at': pd.NA,
        'updated_at': pd.NA,
        'is_comment_available': pd.NA
    }

    # Handle API errors and decide on appropriate actions
    def handle_api_error(response):
        try:
            error_response = response.json()
            error_info = error_response.get('error', {})
            error_reason = ''
            if 'errors' in error_info and error_info['errors']:
                error_reason = error_info['errors'][0].get('reason', '')
        except ValueError:
            # Retry if JSON decoding fails
            return 'retry'

        status = response.status_code

        # Handle specific error codes and reasons
        if status == 403:
            if error_reason == 'commentsDisabled':
                return 'disabled'
            elif error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
                print(f"API quota exceeded. Waiting for 24 hours...")
                show_now_time()
                time.sleep(86400)
                return 'retry'
            elif error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
                print(f"Rate limit exceeded. Waiting for 2 minutes...")
                show_now_time()
                time.sleep(120)
                return 'retry'
            else:
                return 'abort'
        elif status == 404:
            return 'not_found'
        elif status == 429:
            print("Too many requests. Waiting for 70 seconds...")
            show_now_time()
            time.sleep(70)
            return 'retry'
        else:
            print(f"Unexpected API error: {status}, reason: {error_reason}")
            time.sleep(70)
            return 'retry'

    while True:
        comment_params = {
            'part': 'snippet,replies',
            'videoId': video_id,
            'key': API_KEY,
            'maxResults': 100,
            'pageToken': page_token
        }

        for attempt in range(max_retries):
            comment_response = requests.get(comment_url, params=comment_params)

            if comment_response.status_code == 200:
                # Parse the JSON response
                comment_data = comment_response.json()
                break
            else:
                # Handle the error based on its type
                error_action = handle_api_error(comment_response)
                if error_action == 'retry':
                    continue
                elif error_action == 'disabled':
                    return [disabled_comment]
                elif error_action == 'not_found':
                    return [delete_comment]
                elif error_action == 'abort':
                    return [delete_comment]

        else:
            # If retries fail, return a default response
            print(f"Maximum retries reached.")
            return [delete_comment]

        # Extract comments and their replies
        if 'items' in comment_data and comment_data['items']:
            for item in comment_data['items']:
                parent_comment = item['snippet']['topLevelComment']['snippet']
                comment_entry = {
                    'comment': parent_comment['textDisplay'],
                    'comment_like_count': parent_comment['likeCount'],
                    'comment_author_channel_id': parent_comment.get('authorChannelId', {}).get('value'),
                    'comment_published_at': parent_comment['publishedAt'],
                    'updated_at': parent_comment.get('updatedAt', None),
                    'is_comment_available': 'yes',
                    'has_reply': 'no',
                    'reply_comment': pd.NA,
                    'reply_like_count': pd.NA,
                    'reply_author_channel_id': pd.NA,
                    'reply_published_at': pd.NA,
                    'reply_updated_at': pd.NA
                }

                # Process replies if available
                if 'replies' in item:
                    for reply in item['replies']['comments']:
                        reply_snippet = reply['snippet']
                        comment_entry.update({
                            'reply_comment': reply_snippet['textDisplay'],
                            'reply_like_count': reply_snippet['likeCount'],
                            'reply_author_channel_id': reply_snippet.get('authorChannelId', {}).get('value'),
                            'reply_published_at': reply_snippet['publishedAt'],
                            'reply_updated_at': reply_snippet.get('updatedAt', None),
                            'has_reply': 'yes'
                        })
                        comments.append(comment_entry.copy())
                else:
                    comments.append(comment_entry)

        else:
            # Return a default response if no comments are retrieved
            return [empty_comment]

        # Fetch the next page of comments if available
        page_token = comment_data.get('nextPageToken')
        if not page_token:
            break

        time.sleep(3)

    # Return a default value if no comments were retrieved
    if not comments:
        return [empty_comment]

    return comments

# Retrieve metadata for a batch of video IDs
def get_video_data(video_ids, API_KEY):
    video_url = 'https://www.googleapis.com/youtube/v3/videos'
    max_retries = 3
    videos_info = {}

    # Process video IDs in batches of 50
    for start in range(0, len(video_ids), 50):
        batch_video_ids = video_ids[start:start+50]
        video_params = {
            'part': 'snippet,statistics',
            'id': ','.join(batch_video_ids),
            'key': API_KEY
        }

        for attempt in range(max_retries):
            try:
                video_response = requests.get(video_url, params=video_params)

                if video_response.status_code == 200:
                    video_data = video_response.json()
                    fetched_ids = set()
                    if 'items' in video_data:
                        for item in video_data['items']:
                            video_id = item.get('id', None)
                            fetched_ids.add(video_id)

                            # Safely retrieve snippet and statistics
                            snippet = item.get('snippet', {})
                            statistics = item.get('statistics', {})

                            title = snippet.get('title', '')
                            description = snippet.get('description', '')
                            like_count = int(statistics.get('likeCount', 0))
                            view_count = int(statistics.get('viewCount', 0))
                            comment_count = int(statistics.get('commentCount', 0))
                            published_at = convert_to_jst(snippet.get('publishedAt', ''))
                            channel_id = snippet.get('channelId', '')
                            channel_title = snippet.get('channelTitle', '')
                            thumbnail_url = snippet.get('thumbnails', {}).get('default', {}).get('url', pd.NA)

                            # Download the thumbnail if available
                            if pd.notna(thumbnail_url):
                                thumbnail_path = download_thumbnail(thumbnail_url, thumbnail_dir, video_id)
                            else:
                                thumbnail_path = None

                            # Store video information
                            videos_info[video_id] = {
                                'video_id': video_id,
                                'title': title,
                                'description': description,
                                'like_count': like_count,
                                'view_count': view_count,
                                'comment_count': comment_count,
                                'published_at': published_at,
                                'channel_id': channel_id,
                                'channel_title': channel_title,
                                'thumbnail_url': thumbnail_url,
                                'thumbnail_path': thumbnail_path,
                            }

                    # Handle missing video IDs
                    missing_ids = set(batch_video_ids) - fetched_ids
                    for mid in missing_ids:
                        videos_info[mid] = None

                    break
                else:
                    # Parse error response and handle specific cases
                    error_response = video_response.json()
                    error_info = error_response.get('error', {})
                    error_reason = ''
                    if 'errors' in error_info and error_info['errors']:
                        error_reason = error_info['errors'][0].get('reason', '')

                    if video_response.status_code == 403 and error_reason in ['quotaExceeded', 'dailyLimitExceeded']:
                        print("API quota exceeded. Waiting for 24 hours...")
                        show_now_time()
                        time.sleep(86400)
                        continue
                    elif video_response.status_code == 403 and error_reason in ['rateLimitExceeded', 'userRateLimitExceeded']:
                        print("Rate limit exceeded. Waiting for 2 minutes...")
                        time.sleep(120)
                        continue
                    elif video_response.status_code == 404:
                        print(f"Video not found or invalid resource: batch={batch_video_ids}")
                        break
                    else:
                        print(f"Unexpected API error: {video_response.status_code}, reason: {error_reason}")
                        time.sleep(120)
                        continue

            except ValueError:
                print("Failed to decode API response JSON. Retrying...")
                time.sleep(70)
                continue

    return videos_info if videos_info else None

# Process videos in batches and collect all data
def process_videos_in_batches(video_id_list, batch_size, API_KEY):
    all_data = []
    all_video_ids = video_id_list[:]

    while all_video_ids:
        batch_video_ids = all_video_ids[:batch_size]
        all_video_ids = all_video_ids[batch_size:]

        videos_info = get_video_data(batch_video_ids, API_KEY)
        
        if videos_info is None:
            continue

        for video_id in batch_video_ids:
            video_data = videos_info.get(video_id) if videos_info else None
            if video_data:
                comments = get_all_video_comments(video_id, API_KEY)
                for comment in comments:
                    video_entry = video_data.copy()
                    video_entry.update({
                        'comment': comment.get('comment', pd.NA),
                        'comment_like_count': comment.get('comment_like_count', pd.NA),
                        'comment_author_channel_id': comment.get('comment_author_channel_id', pd.NA),
                        'comment_published_at': comment.get('comment_published_at', pd.NA),
                        'updated_at': comment.get('updated_at', pd.NA),
                        'has_reply': comment.get('has_reply', 'no'),
                        'reply_comment': comment.get('reply_comment', pd.NA),
                        'reply_like_count': comment.get('reply_like_count', pd.NA),
                        'reply_author_channel_id': comment.get('reply_author_channel_id', pd.NA),
                        'reply_published_at': comment.get('reply_published_at', pd.NA),
                        'reply_updated_at': comment.get('reply_updated_at', pd.NA)
                    })
                    all_data.append(video_entry)

    return all_data

# Save the collected data to a CSV file
def save_all_data_to_csv(all_data, output_csv_path):
    final_df = pd.DataFrame(all_data)
    final_df.to_csv(output_csv_path, index=False, encoding='utf-8')

# Process a single CSV file and save the results
def process_single_file(file_path, output_dir, batch_size, API_KEY):
    df = pd.read_csv(file_path, encoding='utf-8')
    video_ids = list(df['related_video_id'])
    num_video = len(video_ids)
    if num_video == 200:
        all_data = process_videos_in_batches(video_ids, batch_size, API_KEY)
        output_csv_path = os.path.join(output_dir, os.path.basename(file_path))
        save_all_data_to_csv(all_data, output_csv_path)
    return file_path

# List input files and prepare for parallel processing
input_files = os.listdir(root_dir)
file_paths = [os.path.join(root_dir, f) for f in input_files if os.path.isfile(os.path.join(root_dir, f))]

# Execute tasks in parallel
num_workers = 4
futures = []
with ProcessPoolExecutor(max_workers=num_workers) as executor:
    for file_path in file_paths:
        future = executor.submit(process_single_file, file_path, output_dir, batch_size, API_KEY)
        futures.append(future)

    for f in tqdm(as_completed(futures)):
        try:
            finished_file = f.result()
        except Exception as e:
            print("Error happened in worker:", e)

Upvotes: 0

Views: 78

Answers (0)

Related Questions