Reputation: 91
I implemented a StreamingResponse in FastAPI with audio bytes from async generator sources. But besides need to insert some messages for client side audio player (currently, React Native) just in the stream.
Read about ICY format and it looks like appropriate stuff for this. So what headers are required for stream endpoint and what format a message for AudioPlayer should be to trigger an event (like Event.MetadataCommonReceived
)?
@router.get(
"/stream/{session_id}",
response_class=StreamingResponse,
responses={200: {"content": {"audio/mpeg": {}}, "description": "An audio file in MP3 format"}},
)
async def stream_audio(session_id: str):
...
return StreamingResponse(
stream_from_queue(<some asyncio.Queue>, session_id),
headers={
"content-type": "audio/mpeg",
"icy-metaint": "16000",
"icy-name": "MyAudioStream",
"icy-genre": "Podcast",
"icy-url": "http://localhost:8000"
},
media_type="audio/mpeg",
)
async def stream_from_queue(queue: Queue, session_id: str):
... # get an audio chunk from queue
... # send some metadata
There could be a problem with FastAPI StreamingResponse. It uses chunked transfer encoding. https://medium.com/@ab.hassanein/streaming-responses-in-fastapi-d6a3397a4b7b
Upvotes: 0
Views: 84
Reputation: 91
Base on the protocol description link (thanks to Rauuun) and this nice picture
I implemented this algorithm:
ICY_METADATA_INTERVAL = 16000 # bytes
ICY_BYTES_BLOCK_SIZE = 16 # bytes
ICY_METADATA_SIGNAL = "META_EVENT".encode()
ZERO_BYTE = b"\0"
async def stream_from_queue(queue: Queue, session_id: str):
buffer = b""
...
for chunk in <stream_queue>:
... # get an audio chunk from queue
if chunk == ICY_METADATA_SIGNAL: # if get a special signal we can send some metadata
# flush buffer padded with zeros to ICY_METADATA_INTERVAL length
yield buffer + (ICY_METADATA_INTERVAL - len(buffer)) * ZERO_BYTE
buffer = b""
# send a meta message
yield preprocess_metadata()
else: # send raw audio data
buffer += chunk
if len(buffer) < ICY_METADATA_INTERVAL:
continue
yield buffer[:ICY_METADATA_INTERVAL]
yield ZERO_BYTE # we have to send at least zero byte as metadata after every ICY_METADATA_INTERVAL
buffer = buffer[ICY_METADATA_INTERVAL:]
...
def preprocess_metadata(metadata: str = "META_EVENT") -> bytes:
icy_metadata_formatted = f"StreamTitle='{metadata}';".encode()
icy_metadata_block_length = len(icy_metadata_formatted)
return (
# number of blocks of ICY_BYTES_BLOCK_SIZE needed for this meta message (NOT including this byte)
(1 + (icy_metadata_block_length - 1) // ICY_BYTES_BLOCK_SIZE).to_bytes(1, "big")
# meta message encoded
+ icy_metadata_formatted
# zero-padded tail to fill the last ICY_BYTES_BLOCK_SIZE
+ (ICY_BYTES_BLOCK_SIZE - icy_metadata_block_length % ICY_BYTES_BLOCK_SIZE) * ZERO_BYTE
)
Upvotes: 0
Reputation: 48
After reading https://gist.github.com/niko/2a1d7b2d109ebe7f7ca2f860c3505ef0 it seems that your method stream_from_queue
needs to pad the initial message that you will yield
with the metadata information you desire to send.
I understand the encoding is ascii
Upvotes: 1