Googler Thiru
Googler Thiru

Reputation: 3

How to include Pause and Resume Feature with Azure conversation transcriber?

We are using Azure Conversation Transcriber for realtime speech to text with diarization. We need to incorporate the pause_resume feature. We tried different ways but nothing worked.

Azure only provides stop_transcribing_async() function that completely stops the current session.

I have attached the code we tried but it is not working, Any help will be appreciated. I have attached a block of code that contains the logic for pausing and resuming. Please do advice what else method we could follow.

In the below code, we stop the transcriber completely once "pause" message is passed and restart the same once the "resume" message is detected.

async def receive_audio(uuid, path):

    audio_queue = Queue(maxsize=0)
    
    transcriber_state = False
    try:
        conversation_transcriber, push_stream = create_conversation_transcriber(
            CONNECTIONS.connections[uuid]
        )

        # Start continuous recognition
        conversation_transcriber.start_transcribing_async().get()
        transcriber_state = True
        
        while True:
            # Receive audio data from the WebSocket
            websocket = CONNECTIONS.connections[uuid]["websocket"]
            data = await websocket.recv()
            
            logger.info(CONNECTIONS.connections[uuid]['state'])
            if isinstance(data, str):

                logger.info(f"Current State: {CONNECTIONS.connections[uuid]['state']}")
                if data == "inactive":
                    logger.info("Pausing the transcriber...")
                    conversation_transcriber.stop_transcribing_async().get()
                    push_stream.close()
                    transcriber_state = False
                
                elif data == "active" and not transcriber_state:
                    logger.info(f"Resuming the transcriber...")
                    conversation_transcriber, push_stream = create_conversation_transcriber()                
                    conversation_transcriber.start_transcribing_async().get()
                    transcriber_state = True
                
                CONNECTIONS.connections[uuid]["state"] = data
                    
            
            if CONNECTIONS.connections[uuid]["state"] == "active":
                audio_queue.put_nowait(data)
                while not audio_queue.empty():
                    chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                    CONNECTIONS.connections[uuid]["audio_buffer"] += chunk
                    push_stream.write(chunk)

    except websockets.exceptions.ConnectionClosed as e:
        logger.info("Connection closed")
        logger.info(e)
        conversation_transcriber.stop_transcribing_async().get()
        push_stream.close()
    except Exception as e:
        logger.error(f"Error in receive_audio: {e}")

    finally:
        await websocket.close(code=1000)

Upvotes: 0

Views: 101

Answers (2)

Suresh Chikkam
Suresh Chikkam

Reputation: 3413

Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.

  • When we resume, the transcriber can pick up from where it left off, though it's important to check that the SDK can handle this correctly. You may need to investigate how much state is maintained in the transcription session.

App.py:

import logging
import azure.cognitiveservices.speech as speechsdk
import asyncio
from queue import Queue
import websockets

# Assume CONNECTIONS is a global dict to manage websocket connections.
CONNECTIONS = {}

async def receive_audio(uuid, path):
    audio_queue = Queue(maxsize=0)
    transcriber_state = False  # False means transcriber is paused
    conversation_transcriber = None
    push_stream = None

    try:
        # Get the WebSocket connection and initialize the transcriber
        websocket = CONNECTIONS[uuid]["websocket"]
        connection_details = CONNECTIONS[uuid]
        
        conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)

        # Start continuous recognition
        conversation_transcriber.start_transcribing_async().get()
        transcriber_state = True
        logging.info("Started transcribing...")

        while True:
            # Receive control messages or audio data
            data = await websocket.recv()

            if isinstance(data, str):
                # Handle 'inactive' and 'active' state changes (pause/resume)
                logging.info(f"Received state: {data}")
                if data == "inactive" and transcriber_state:
                    # Pausing: keep the transcriber alive, but stop sending audio
                    logging.info("Pausing the transcriber... (not stopping)")
                    transcriber_state = False

                elif data == "active" and not transcriber_state:
                    # Resuming: continue sending audio to the transcriber
                    logging.info("Resuming the transcriber...")
                    transcriber_state = True
                
                CONNECTIONS[uuid]["state"] = data
            
            # If transcriber is active, continue pushing audio data
            if CONNECTIONS[uuid]["state"] == "active":
                audio_queue.put_nowait(data)
                while not audio_queue.empty():
                    chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                    CONNECTIONS[uuid]["audio_buffer"] += chunk
                    push_stream.write(chunk)  # Keep writing to the open stream

    except websockets.exceptions.ConnectionClosed as e:
        logging.info("WebSocket connection closed.")
        if conversation_transcriber:
            conversation_transcriber.stop_transcribing_async().get()
        if push_stream:
            push_stream.close()
    except Exception as e:
        logging.error(f"Error in receive_audio: {e}")
    finally:
        await websocket.close(code=1000)
        logging.info("WebSocket closed.")

def create_conversation_transcriber(connection_details):
    """Create a conversation transcriber with Azure speech configuration."""
    speech_config = speechsdk.SpeechConfig(
        subscription=connection_details['subscription_key'],
        region=connection_details['region']
    )
    audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
    push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
    audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
    
    transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
    
    return transcriber, push_stream

# Helper function to get chunk from queue
def get_chunk_from_queue(q, chunk_size):
    return q.get_nowait()

async def main():
    # Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
    CONNECTIONS['dummy_uuid'] = {
        'websocket': await websockets.connect('ws://localhost:8000'),  # Example WebSocket endpoint
        'subscription_key': 'your_azure_subscription_key',
        'region': 'your_azure_region',
        'audio_buffer': bytearray(),
        'state': 'active'  # Initial state
    }

    # Start receiving audio for this connection
    await receive_audio('dummy_uuid', 'path/to/audio')

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    logging.info("Starting the application...")
    
    # Run the asyncio event loop to execute the main function
    asyncio.run(main())
  • when you receive a "pause" command, you can buffer the incoming audio data and delay pushing it to the transcriber until a "resume" command is received.

  • CONNECTIONS[uuid]["state"] controls the flow of audio to the transcriber. When the state is "inactive," the audio stream is not fed to the transcriber.

Console Log:

![enter image description here](Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.

  • When we resume, the transcriber can pick up from where it left off, though it's important to check that the SDK can handle this correctly. You may need to investigate how much state is maintained in the transcription session.

App.py:

import logging
import azure.cognitiveservices.speech as speechsdk
import asyncio
from queue import Queue
import websockets

# Assume CONNECTIONS is a global dict to manage websocket connections.
CONNECTIONS = {}

async def receive_audio(uuid, path):
    audio_queue = Queue(maxsize=0)
    transcriber_state = False  # False means transcriber is paused
    conversation_transcriber = None
    push_stream = None

    try:
        # Get the WebSocket connection and initialize the transcriber
        websocket = CONNECTIONS[uuid]["websocket"]
        connection_details = CONNECTIONS[uuid]
        
        conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)

        # Start continuous recognition
        conversation_transcriber.start_transcribing_async().get()
        transcriber_state = True
        logging.info("Started transcribing...")

        while True:
            # Receive control messages or audio data
            data = await websocket.recv()

            if isinstance(data, str):
                # Handle 'inactive' and 'active' state changes (pause/resume)
                logging.info(f"Received state: {data}")
                if data == "inactive" and transcriber_state:
                    # Pausing: keep the transcriber alive, but stop sending audio
                    logging.info("Pausing the transcriber... (not stopping)")
                    transcriber_state = False

                elif data == "active" and not transcriber_state:
                    # Resuming: continue sending audio to the transcriber
                    logging.info("Resuming the transcriber...")
                    transcriber_state = True
                
                CONNECTIONS[uuid]["state"] = data
            
            # If transcriber is active, continue pushing audio data
            if CONNECTIONS[uuid]["state"] == "active":
                audio_queue.put_nowait(data)
                while not audio_queue.empty():
                    chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                    CONNECTIONS[uuid]["audio_buffer"] += chunk
                    push_stream.write(chunk)  # Keep writing to the open stream

    except websockets.exceptions.ConnectionClosed as e:
        logging.info("WebSocket connection closed.")
        if conversation_transcriber:
            conversation_transcriber.stop_transcribing_async().get()
        if push_stream:
            push_stream.close()
    except Exception as e:
        logging.error(f"Error in receive_audio: {e}")
    finally:
        await websocket.close(code=1000)
        logging.info("WebSocket closed.")

def create_conversation_transcriber(connection_details):
    """Create a conversation transcriber with Azure speech configuration."""
    speech_config = speechsdk.SpeechConfig(
        subscription=connection_details['subscription_key'],
        region=connection_details['region']
    )
    audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
    push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
    audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
    
    transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
    
    return transcriber, push_stream

# Helper function to get chunk from queue
def get_chunk_from_queue(q, chunk_size):
    return q.get_nowait()

async def main():
    # Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
    CONNECTIONS['dummy_uuid'] = {
        'websocket': await websockets.connect('ws://localhost:8000'),  # Example WebSocket endpoint
        'subscription_key': 'your_azure_subscription_key',
        'region': 'your_azure_region',
        'audio_buffer': bytearray(),
        'state': 'active'  # Initial state
    }

    # Start receiving audio for this connection
    await receive_audio('dummy_uuid', 'path/to/audio')

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    logging.info("Starting the application...")
    
    # Run the asyncio event loop to execute the main function
    asyncio.run(main())
  • when you receive a "pause" command, you can buffer the incoming audio data and delay pushing it to the transcriber until a "resume" command is received.

  • CONNECTIONS[uuid]["state"] controls the flow of audio to the transcriber. When the state is "inactive," the audio stream is not fed to the transcriber.

Console Log:

enter image description here)

Upvotes: 0

Joel Odey
Joel Odey

Reputation: 84

Incorporating a pause and resume feature for the Azure Conversation Transcriber requires handling the stop_transcribing_async and start_transcribing_async methods appropriately. Your current approach stops and restarts the transcriber but does it in a way that might cause issues with the state management and the audio queue.

Upvotes: 0

Related Questions