mgd
mgd

Reputation: 4342

Server-Sent Events (SSE) wit RESTEasy exhausting Jetty's thread pool

EDIT:


Question

We are trying to implement Server-Sent Events (SSE) using

but we are having the problem that each SSE subscription consumes and holds on to a thread in Jetty's thread pool for the entire duration of the subscription.

If we use Jersey instead of RESTEasy, everything works as expected.

Is there are way to configure RESTEasy to not exhausting Jetty's thread pool?

Code implementing SSE using RESTEasy

A resource class to handle SSE subscriptions and push events to a subscriber every second:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;

@Path("/events")
public class EventsResource {
    private final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
    private final static AtomicInteger channelIds = new AtomicInteger();

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void subscribe(@Context SseEventSink sink, @Context Sse sse) {
          final int channelId = channelIds.getAndIncrement();
          final Runnable sendMessage = () -> {
                final OutboundSseEvent event = sse.newEventBuilder()
                    .name("message")
                    .data(String.class, "Hello client " + channelId + "!")
                    .build();
                sink.send(event);
          };
          executor.scheduleAtFixedRate(sendMessage, 0, 1, TimeUnit.SECONDS);
    }
}

The resource class is added to the set of classes in our Application class:

import java.util.HashSet;
import java.util.Set;
import jakarta.ws.rs.core.Application;

public class RestApplication extends Application {
    /** {@inheritDoc} */
    @Override
    public Set<Class<?>> getClasses() {
        Set<Class<?>> classes = new HashSet<>();
        classes.add(EventsResource.class);
        return classes;
    }
}

Finally, we have our main class where we start Jetty with our application and a minimal thread pool of 5 threads:

import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.example.rest.RestApplication;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;

public class Main {
  public static void main(String[] args) throws Exception {
    Server server = new Server(new QueuedThreadPool(5));

    ServerConnector serverConnector = new ServerConnector(server);
    serverConnector.setPort(8081);
    server.setConnectors(new Connector[] { serverConnector });

    ServletHolder servletHolder = new ServletHolder(new HttpServletDispatcher());
    servletHolder.setInitParameter("jakarta.ws.rs.Application", RestApplication.class.getName());
    servletHolder.setAsyncSupported(true);
    ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
    servletContextHandler.addServlet(servletHolder, "/*");
    server.setHandler(servletContextHandler);

    server.start();
    server.join();
  }
}

Exhausting Jetty's thread pool

When subscribing to SSE, we get:

% curl "http://localhost:8081/events" -H 'Accept: text/event-stream'

event: message
data: 0: Hello client 0!

event: message
data: 1: Hello client 0!

event: message
data: 2: Hello client 0!

...

If we start 2 parallel curl commands, the Jetty thread pool is exhausted and the 3rd request will hang waiting for a thread.

Upvotes: 1

Views: 73

Answers (1)

talex
talex

Reputation: 20544

First you add scheduler to your class:

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

Then change your method to

  @GET
  @Produces("text/event-stream")
  public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
    executor.scheduleAtFixedRate(()-> {
              final OutboundSseEvent event = sse.newEventBuilder()
                          .name("message")
                          .data(String.class, "Hello client!")
                          .build();
              sink.send(event);
    }, 0, 1, TimeUnit.SECONDS);
  }

Upvotes: 0

Related Questions