Master Po
Master Po

Reputation: 1697

Spring Server Sent Events - ResponseBodyEmitter is already set complete

I'm using spring 4.3.7.RELEASE to set up the server sent events. And used @scheduled annotation to send messages every 2 seconds. Here is the controller.

@Controller
public class MySSEController {
    private final SseEmitter sseEmitter = new SseEmitter();
    private int counter = 0;

    @RequestMapping("/ssestream")
    public SseEmitter getRealTimeMessageAction() throws IOException {

        sseEmitter.send("MessageCounter : " + counter);
        return sseEmitter;
    }

    @Scheduled(fixedDelay = 2*1000)
    public void scheduledMsgEmitter() throws IOException
    {
        if(null != sseEmitter) {
            sseEmitter.send("MessageCounter : " + ++counter);
        }
    }

}

I'm running this on Tomcat 9 directly from eclipse. The application starts and send messages to the browser at every 2 soconds. But after some time it stops sending messages and I see the below exception in eclipse console.

Mar 16, 2017 6:57:34 PM org.springframework.web.servlet.mvc.support.DefaultHandlerExceptionResolver handleAsyncRequestTimeoutException
SEVERE: Async timeout for GET [/streaming-web/stream/ssestream]
Mar 16, 2017 6:57:35 PM org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler handleError
SEVERE: Unexpected error occurred in scheduled task.
java.lang.IllegalStateException: ResponseBodyEmitter is already set complete
    at org.springframework.util.Assert.state(Assert.java:70)
    at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:158)
    at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:126)
    at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:107)
    at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:89)
    at com.mycomp.test.controllers.MySSEController.scheduledMsgEmitter(MySSEController.java:25)
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Am I missing something? Please help me to resolve this problem.

Upvotes: 7

Views: 17431

Answers (4)

Linking
Linking

Reputation: 21

Record info I have got about SSE.

Points:

  • 1.Timeout default is 30s, when there end, frontend will trigger onerror listener, backend will hit sseEmitter.onComplete()
  • 2.Scheduled, I think it's not a good tool to send msg; my suggest is ScheduledExecutorService

Answer:

  • 1.Which @NeoJhou has mentioned, new SseEmitter(0L) is work for me
  • 2.In frontend code, 'you' may close() the eventsource connect when error happened
  • 3.In backend code, also need complete() manually

References:

Upvotes: 2

NeoJhou
NeoJhou

Reputation: 21

I used new org.springframework.web.servlet.mvc.method.annotation.SseEmitter(0L) without completion of events.

Upvotes: 2

rwfbc
rwfbc

Reputation: 998

The SseEmitter timeout is in milliseconds: 60001 is 60 seconds. It is a session timeout, not affected by activity on the session.

The timeout needs to be set to the expected duration of the session, in milliseconds. So, 86400000 (or more) is entirely appropriate.

Upvotes: 4

Master Po
Master Po

Reputation: 1697

I'm not sure whether it is the correct solution.

I've slightly modified the implementation to work with all connected clients Earlier I was unable to deliver the same message to different clients connected to this stream This implementation doesn't throw the IllegalStateException.

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Controller
public class MySSEController_Working {

    private Set<SseEmitter> sseEmitters = new HashSet<SseEmitter>();
    private int messageCount = 0;

    @RequestMapping("/ssestream")
    public SseEmitter getRealTimeMessageAction(HttpServletRequest request, HttpServletResponse response) throws IOException {

        final SseEmitter sseEmitter = new SseEmitter();

        sseEmitter.onCompletion(() -> {
            synchronized (this.sseEmitters) {
                this.sseEmitters.remove(sseEmitter);
            }
        });

        sseEmitter.onTimeout(()-> {
            sseEmitter.complete();
        });

        // Put context in a map
        sseEmitters.add(sseEmitter);

        return sseEmitter;
    }

    @Scheduled(fixedDelay = 2*1000)
    public void scheduledMsgEmitter() throws IOException
    {
        if(!sseEmitters.isEmpty())
            ++messageCount;
        else 
            System.out.println("No active Emitters ");

        System.out.println("Sent Messages : " + messageCount);

        sseEmitters.forEach(emitter -> {
            if (null != emitter)
                try {
                    System.out.println("Timeout : "+ emitter.getTimeout());
                    emitter.send("MessageCounter : " + messageCount);
                    emitter.complete();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        });
    }
}

Upvotes: 6

Related Questions