Mehn
Mehn

Reputation: 1

Celery tasks stuck when using --pool=prefork option

I am working on a project that involves processing audio files and generating transcripts using Celery tasks. The tasks are defined in the workers/process.py file. I have a generate_transcript task that takes various parameters such as names_filename, audio_filename, task_id, and url. This task is responsible for loading speaker names from a JSON file, performing speaker diarization on the audio file, transcribing audio segments using OpenAI's Whisper API, and sending the generated transcriptions to an API endpoint.

The issue I am facing is that when I start the Celery worker with the --pool=threads option, the tasks get stuck when they are received by the worker. The worker appears to be running, but the tasks are not being processed. This is causing a bottleneck in my audio processing pipeline.

What I've Tried:

I have checked the Celery worker logs for any error messages or indications of issues, but I haven't found any relevant information. I have ensured that all the required dependencies, including pyannote.audio, are installed and available to the worker.

Here's the relevant part of my Dockerfile:

CMD ["celery", "-A", "workers.process", "worker", "--pool=threads", "--loglevel=info"]

Here's a simplified version of the generate_transcript task:

@shared_task(bind=True, max_retries=1)
def generate_transcript(
    self,
    names_filename,
    audio_filename,
    task_id,
    url,
):
    try:
        audio_path = os.path.join(recordings_directory, audio_filename)
        names_path = os.path.join(recordings_directory, names_filename)
        
        transcriptions = []
        names = load_json(names_path)
        diarization = diarize_audio(audio_path)
        audio = AudioSegment.from_file(audio_path)
        duration = audio.duration_seconds
        transcriptions = transcribe_segments(audio, diarization, openai_client)
        
        if len(names) > 0:
            matching_speaker_transcriptions = match_speaker(transcriptions, names)
        else:
            matching_speaker_transcriptions = transcriptions
        
        # ... (rest of the code for generating and sending transcriptions)
        
    except Exception as e:
        print(f"Error processing audio: {e}")
        raise self.retry(exc=e)
def diarize_audio(audio_path):
    try:
        diarization = pipeline(audio_path)
        return diarization
    except Exception as e:
        print(f"Error during speaker diarization: {e}")
        raise e ("Error during speaker diarization")

Upvotes: 0

Views: 90

Answers (0)

Related Questions