Reputation: 4342
EDIT:
Thread.sleep()
using an executor instead.servletHolder.setAsyncSupported(true)
.We are trying to implement Server-Sent Events (SSE) using
but we are having the problem that each SSE subscription consumes and holds on to a thread in Jetty's thread pool for the entire duration of the subscription.
If we use Jersey instead of RESTEasy, everything works as expected.
Is there are way to configure RESTEasy to not exhausting Jetty's thread pool?
A resource class to handle SSE subscriptions and push events to a subscriber every second:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;
@Path("/events")
public class EventsResource {
private final static ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
private final static AtomicInteger channelIds = new AtomicInteger();
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void subscribe(@Context SseEventSink sink, @Context Sse sse) {
final int channelId = channelIds.getAndIncrement();
final Runnable sendMessage = () -> {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, "Hello client " + channelId + "!")
.build();
sink.send(event);
};
executor.scheduleAtFixedRate(sendMessage, 0, 1, TimeUnit.SECONDS);
}
}
The resource class is added to the set of classes in our Application
class:
import java.util.HashSet;
import java.util.Set;
import jakarta.ws.rs.core.Application;
public class RestApplication extends Application {
/** {@inheritDoc} */
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<>();
classes.add(EventsResource.class);
return classes;
}
}
Finally, we have our main class where we start Jetty with our application and a minimal thread pool of 5 threads:
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHolder;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.example.rest.RestApplication;
import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher;
public class Main {
public static void main(String[] args) throws Exception {
Server server = new Server(new QueuedThreadPool(5));
ServerConnector serverConnector = new ServerConnector(server);
serverConnector.setPort(8081);
server.setConnectors(new Connector[] { serverConnector });
ServletHolder servletHolder = new ServletHolder(new HttpServletDispatcher());
servletHolder.setInitParameter("jakarta.ws.rs.Application", RestApplication.class.getName());
servletHolder.setAsyncSupported(true);
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContextHandler.addServlet(servletHolder, "/*");
server.setHandler(servletContextHandler);
server.start();
server.join();
}
}
When subscribing to SSE, we get:
% curl "http://localhost:8081/events" -H 'Accept: text/event-stream'
event: message
data: 0: Hello client 0!
event: message
data: 1: Hello client 0!
event: message
data: 2: Hello client 0!
...
If we start 2 parallel curl
commands, the Jetty thread pool is exhausted and the 3rd request will hang waiting for a thread.
Upvotes: 1
Views: 73
Reputation: 20544
First you add scheduler to your class:
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Then change your method to
@GET
@Produces("text/event-stream")
public void subscribe(@Context SseEventSink sink, @Context Sse sse) throws InterruptedException {
executor.scheduleAtFixedRate(()-> {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, "Hello client!")
.build();
sink.send(event);
}, 0, 1, TimeUnit.SECONDS);
}
Upvotes: 0