Benoit Blanchon
Benoit Blanchon

Reputation: 14561

Django 3.1: StreamingHttpResponse with an async generator

Documentation for Django 3.1 says this about async views:

The main benefits are the ability to service hundreds of connections without using Python threads. This allows you to use slow streaming, long-polling, and other exciting response types.

I believe that "slow streaming" means we could implement an SSE view without monopolizing a thread per client, so I tried to sketch a simple view, like so:

async def stream(request):

    async def event_stream():
        while True:
            yield 'data: The server time is: %s\n\n' % datetime.datetime.now()
            await asyncio.sleep(1)

    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

(note: I adapted the code from this response)

Unfortunately, when this view is invoked, it raises the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/asgiref/sync.py", line 330, in thread_handler
    raise exc_info[1]
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/exception.py", line 38, in inner
    response = await get_response(request)
  File "/usr/local/lib/python3.7/site-packages/django/core/handlers/base.py", line 231, in _get_response_async
    response = await wrapped_callback(request, *callback_args, **callback_kwargs)
  File "./chat/views.py", line 144, in watch
    return StreamingHttpResponse(event_stream(), content_type='text/event-stream')
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 367, in __init__
    self.streaming_content = streaming_content
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 382, in streaming_content
    self._set_streaming_content(value)
  File "/usr/local/lib/python3.7/site-packages/django/http/response.py", line 386, in _set_streaming_content
    self._iterator = iter(value)
TypeError: 'async_generator' object is not iterable

To me, this shows that StreamingHttpResponse doesn't currently support async generators.

I tried to modify StreamingHttpResponse to use async for but I wasn't able to do much.

Any idea how I could do that?

Upvotes: 21

Views: 10087

Answers (5)

Jarym
Jarym

Reputation: 2380

This is an old question but it came up on a Google result since I was looking for a solution to the same issue. In the end I found this repo https://github.com/valberg/django-sse - which uses async views in Django 4.2 to stream via SSE (specifically see here).

I understand this is a recent addition to Django so I hope it helps anyone else looking for an answer.

Upvotes: 4

GrantJ
GrantJ

Reputation: 8719

I created a decorator named stream that can be used with a coroutine function to make it compatible with Django's StreamingHttpResponse. Here's an example:

import asyncio
import functools

from django.http import StreamingHttpResponse


def stream(coroutine_function):
    @functools.wraps(coroutine_function)
    def wrapper(*args, **kwargs):
        coroutine = coroutine_function(*args, **kwargs)
        try:
            while True:
                yield asyncio.run(coroutine.__anext__())
        except StopAsyncIteration:
            pass
    return wrapper


@stream
async def chunks():
    for char in 'Hello, world!':
        yield char
        await asyncio.sleep(1)


async def index(request):
    return StreamingHttpResponse(chunks())

I also needed to add nest_asyncio and call apply() at the top of the settings.py file like:

import nest_asyncio
nest_asyncio.apply()

The nest_asyncio dependency supports calling asyncio.run from the wrapper function created by the stream decorator.

Finally, Django's asgi can be run using uvicorn through gunicorn like:

$ gunicorn -k uvicorn.workers.UvicornWorker www.asgi:application

Upvotes: 0

wowkin2
wowkin2

Reputation: 6355

Another way to do SSE is to use special library django-eventstream:

Add following to HTML page that will consume data:

<script src="{% static 'django_eventstream/eventsource.min.js' %}"></script>
<script src="{% static 'django_eventstream/reconnecting-eventsource.js' %}"></script>

var es = new ReconnectingEventSource('/events/');

es.addEventListener('message', function (e) {
    console.log(e.data);
}, false);

es.addEventListener('stream-reset', function (e) {
    // ... client fell behind, reinitialize ...
}, false);

For backend you'll need to properly setup Django, and later you'll be able to call following method in any view/task/signal/method that needs to do Server Side Event (SSE):

Add following view that will produce data (events):

# from django_eventstream import send_event

send_event('test', 'message', {'text': 'hello world'})

Upvotes: 0

wowkin2
wowkin2

Reputation: 6355

Honestly it is not supported natively by Django, but I have a solution for you using Daphne (which is also using in Django channels).

Created own StreamingHttpResponse class that is able to retrieve data stream from async methods and provide it to synchronous part of Django.

import asyncio

# By design asyncio does not allow its event loop to be nested.
# Trying to do so will give the error "RuntimeError: This event loop is already running".
# This library solves that problem.
import nest_asyncio

from django.http.response import StreamingHttpResponse


class AsyncStreamingHttpResponse(StreamingHttpResponse):

    def __init__(self, streaming_content=(), *args, **kwargs):
        sync_streaming_content = self.get_sync_iterator(streaming_content)
        super().__init__(streaming_content=sync_streaming_content, *args, **kwargs)

    @staticmethod
    async def convert_async_iterable(stream):
        """Accepts async_generator and async_iterator"""
        return iter([chunk async for chunk in stream])

    def get_sync_iterator(self, async_iterable):
        nest_asyncio.apply()

        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        result = loop.run_until_complete(self.convert_async_iterable(async_iterable))
        return result

Also, you'll need to run your Django web-server using Daphne to support Server Sent Events (SSE) properly. It is officially support by "Django Software Foundation" and has similar syntax to gunicorn, but uses asgi.py instead of wsgi.py.

To use it - you can install using: pip install daphne

And change command from: python manage.py runserver
to something like: daphne -b 0.0.0.0 -p 8000 sse_demo.asgi:application.

Not sure if it will work with gunicorn.

Let me know if you'll have any more questions.

Upvotes: 6

Adrien
Adrien

Reputation: 53

It seems you have to use something like django-channel :

Channels augments Django to bring WebSocket, long-poll HTTP, task offloading and other async support to your code, using familiar Django design patterns and a flexible underlying framework that lets you not only customize behaviours but also write support for your own protocols and needs.

Upvotes: -1

Related Questions