Daniel Messner
Daniel Messner

Reputation: 2567

Deno: Server Sent Events

Server Sent Events are a valuable tool to open a persistent connection to a web server, where the server has the ability to push new data to the client, when available.

Using this technology in Node.js is quite straightforward and can be implemented with the following code example:

#!/usr/bin/env node
'use strict';

const http = (options, listener) => require('http').createServer(listener).listen(options.port);

http({ port: 8080 }, (req, res) => {
  switch (req.url) {
    case '/server-sent-events': {
      res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Connection': 'keep-alive',
        'Cache-Control': 'no-cache',
      });

      const sendDate = () => res.write(`data: ${new Date()}\n\n`);
      sendDate();
      const interval = setInterval(sendDate, 1000);

      req.on('close', () => clearInterval(interval));
    } break;

    default: {
      res.writeHead(200, {
        'Content-Type': 'text/html; charset=utf-8',
      });
      res.end(`
        <!DOCTYPE html>
        <html>
          <head>
            <title>Server Send Events</title>
            <meta charset="utf-8">
            <script>
              const sse = new EventSource('/server-sent-events');
              sse.onerror = () => document.body.innerHTML = 'Connection Error';
              sse.onmessage = ({ data }) => document.body.innerHTML = data;
            </script>
          </head>
          <body></body>
        </html>
      `);
    }
  }
});

Unfortunately I am not able to achieve the same goal with Deno, as there is no simple write method on the request object, but I guess it has to be implemented somehow using the req.w buffer. Can you help me please finish off the following example code, so the Server Sent Events can be utilised with Deno as well?

#!/usr/bin/env deno run --allow-net

import { listenAndServe as http } from 'https://deno.land/std/http/server.ts';

http({ port: 8080 }, (req) => {
  switch (req.url) {
    case '/server-sent-events': {
      // missing steps:
      // * setup the server sent event headers
      // * create the interval and send the date periodically
      // * clear the interval when the connection gets closed
    } break;

    default: {
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Send Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

Thank you very much for your support!

[Update 2021-11-04]:

I have made some progress doing some research across different sources (https://deno.land/[email protected]/http/server.ts, https://github.com/denoland/deno/issues/4817) and got a step closer to the solution. Using the updated example below at least the setup and usage of the Server Sent Events do work now. The remaining issue (besides cleaning up and refactoring of the code) remains the safe detection when the incoming request has been closed (see comments in the source code below):

#!/usr/bin/env deno run --allow-net

import { listenAndServe as http } from 'https://deno.land/std/http/server.ts';

http({ port: 8080 }, (req) => {
  switch (req.url) {
    case '/server-sent-events': {
      // set up a quick´n´dirty write method without error checking
      req.write = (data) => {
        req.w.write(new TextEncoder().encode(data));
        req.w.flush();
      };

      // setup the server sent event headers
      let headers = '';
      headers += 'HTTP/1.1 200 OK\r\n';
      headers += 'Connection: keep-alive\r\n';
      headers += 'Cache-Control: no-cache\r\n';
      headers += 'Content-Type: text/event-stream\r\n';
      headers += '\r\n';
      req.write(headers);

      // create the interval and send the date periodically
      const sendDate = () => req.write(`data: ${new Date()}\n\n`);
      sendDate();
      const interval = setInterval(sendDate, 1000);

      // final missing step:
      // * clear the interval when the connection gets closed

      // currently dropping the connection from the client will
      // result in the error: Uncaught (in promise) BrokenPipe:
      // Broken pipe (os error 32)
      // this error also does not seem to be catchable in the 
      // req.write method above, so there needs to be another safe
      // way to prevent this error from occurring.
    } break;

    default: {
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Send Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

[Update 2021-04-16]

All issues have been resolved and are posted in my accepted answer below.

Upvotes: 3

Views: 2487

Answers (3)

RandallB
RandallB

Reputation: 5575

This example seems more idiomatic.

import { serve } from "https://deno.land/[email protected]/http/server.ts";

const msg = new TextEncoder().encode("data: hello\r\n");

serve(async (_) => {
  let timerId: number | undefined;
  const body = new ReadableStream({
    start(controller) {
      timerId = setInterval(() => {
        controller.enqueue(msg);
      }, 1000);
    },
    cancel() {
      if (typeof timerId === "number") {
        clearInterval(timerId);
      }
    },
  });
  return new Response(body, {
    headers: {
      "Content-Type": "text/event-stream",
    },
  });
});
console.log("Listening on http://localhost:8000");

Upvotes: 1

Daniel Messner
Daniel Messner

Reputation: 2567

In the end I have found an answer to my question and so the full answer with plenty of comments follows, so you get a working version of server sent events in Deno. The solution below also solves the os error 32, which gets caused by not catching the connection writer flash method:

#!/usr/bin/env deno run --allow-net

// imports
import { ServerRequest, listenAndServe as http } from 'https://deno.land/std/http/server.ts';

// commodity
const encoder = new TextEncoder();
const print = console.log;


// start the web-server
// this one allows the endpoint `/server-sent-events`, which hosts a clock that
// will be refreshed every second (the efficiency of the clock solution could of
// course be optimised, as every client gets its own clock interval, but this
// this does not matter as this example wants to show how to setup and clean a
// task for every connecting client)
// all other requests will be answered with a simple html page that subscribes
// to the sse-based clock
http({ port: 8080 }, async (req) => {
  // ip address of the client (formatted as `ip:port`, so we cut the `:port` part
  // of it)
  const ip = req.headers.get('host').split(':').slice(0, -1).join(':');

  // determine the endpoint to access
  switch (req.url) {
    // host the server sent event based clock
    case '/server-sent-events': {
      // logging
      print(`+ Client ${ip} connected`);

      // prepare the disconnect promise. we will use this one later on to await
      // the clients disconnect, so we can properly clean up. so the promise will
      // be resolved manually by us when we detect a disconnect from the client
      // on an attempt to send new data to him (unfortunately there seems to be
      // no other way to detect when the client actually closed the connection)
      let resolver;
      const disconnect = new Promise((resolve) => resolver = resolve);

      // write helper
      req.write = async (data) => {
        // send the current data to the client
        req.w.write(encoder.encode(data));

        // to actually send the data we need to flush the writer first. we need
        // to try/catch this part, as not handling errors on flush will lead to
        // the `Broken pipe (os error 32)` error
        try {
          await req.w.flush();
        } catch(err) {
          // throw any errors but the broken pipe, which gets thrown when the
          // client has already disconnected and we try to send him new data
          // later on
          if (err.name !== 'BrokenPipe') {
            throw err;
          }

          // close the connection from our side as well
          req.conn.close();

          // resolve our `disconnect` promise, so we can clean up
          resolver();
        }
      };

      // date writer (interval method which pushes the current date to the client)
      const sendDate = async () => await req.write(`data: ${new Date()}\n\n`);

      // prepare and send the headers
      let headers = '';
      headers += `HTTP/1.1 200 OK\r\n`;
      headers += `Connection: keep-alive\r\n`;
      headers += `Cache-Control: no-cache\r\n`;
      headers += `Content-Type: text/event-stream\r\n`;
      headers += `\r\n`;
      await req.write(headers);

      // send the date now for the first time and then every second
      sendDate();
      const interval = setInterval(sendDate, 1000);

      // await until the clients disconnects to clean up. so we will be "stuck"
      // here until a disconnect gets detected as we use a promise based approach
      // to detect the disconnect
      await disconnect;
      clearInterval(interval);

      // logging
      print(`- Client ${ip} disconnected`);
    } break;

    // all other requests host a simple html page which subscribes to the clock
    default: {
      print(`* Serve website to ${ip}`);
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Sent Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

Upvotes: 1

Marcos Casagrande
Marcos Casagrande

Reputation: 40444

Deno's http library doesn't support SSE, but you can use Oak Framework, or implement it yourself.

import { Application, Router } from "https://deno.land/x/oak/mod.ts";

const app = new Application();
const router = new Router();

router.get('/', ctx => {
  ctx.response.body = `
    <!DOCTYPE html>
    <html>
      <head>
        <title>Server Send Events</title>
        <meta charset="utf-8">
        <script>
          const sse = new EventSource('/server-sent-events');
          sse.onerror = () => document.body.innerHTML = 'Connection Error';
          sse.onmessage = ({ data }) => document.body.innerHTML = data;
        </script>
      </head>
      <body></body>
    </html>
  `;
})

router.get("/server-sent-events", (ctx) => {
  const target = ctx.sendEvents();
  const sendDate = () => target.dispatchMessage(`${new Date()}`);
  sendDate();
  const interval = setInterval(sendDate, 1000);
});

app.use(router.routes());
await app.listen({ port: 8080 });

Upvotes: 2

Related Questions