Reputation: 1
I am working on a project that involves processing audio files and generating transcripts using Celery tasks. The tasks are defined in the workers/process.py file. I have a generate_transcript task that takes various parameters such as names_filename, audio_filename, task_id, and url. This task is responsible for loading speaker names from a JSON file, performing speaker diarization on the audio file, transcribing audio segments using OpenAI's Whisper API, and sending the generated transcriptions to an API endpoint.
The issue I am facing is that when I start the Celery worker with the --pool=threads option, the tasks get stuck when they are received by the worker. The worker appears to be running, but the tasks are not being processed. This is causing a bottleneck in my audio processing pipeline.
What I've Tried:
I have checked the Celery worker logs for any error messages or indications of issues, but I haven't found any relevant information. I have ensured that all the required dependencies, including pyannote.audio, are installed and available to the worker.
Here's the relevant part of my Dockerfile:
CMD ["celery", "-A", "workers.process", "worker", "--pool=threads", "--loglevel=info"]
Here's a simplified version of the generate_transcript task:
@shared_task(bind=True, max_retries=1)
def generate_transcript(
self,
names_filename,
audio_filename,
task_id,
url,
):
try:
audio_path = os.path.join(recordings_directory, audio_filename)
names_path = os.path.join(recordings_directory, names_filename)
transcriptions = []
names = load_json(names_path)
diarization = diarize_audio(audio_path)
audio = AudioSegment.from_file(audio_path)
duration = audio.duration_seconds
transcriptions = transcribe_segments(audio, diarization, openai_client)
if len(names) > 0:
matching_speaker_transcriptions = match_speaker(transcriptions, names)
else:
matching_speaker_transcriptions = transcriptions
# ... (rest of the code for generating and sending transcriptions)
except Exception as e:
print(f"Error processing audio: {e}")
raise self.retry(exc=e)
def diarize_audio(audio_path):
try:
diarization = pipeline(audio_path)
return diarization
except Exception as e:
print(f"Error during speaker diarization: {e}")
raise e ("Error during speaker diarization")
Upvotes: 0
Views: 90