Reputation: 311
I'm currently implementing a notification system based on some examples I found reading articles about Spring and SSE (in a non-reactive way).
I've succeeded implementing a solution, and it works well if I have a single client consuming the events sent by the backend.
The problem is when I open several browsers and try to fire the events to all the consumers : only the last client which subscribed to the SSE broadcaster endpoint receive the notification.
How to fire events to multiple clients at the same time ?
Maybe it is normal to have a single SSE connection if clients are on the same network ?
Here is my controller:
@RestController
@RequestMapping("/sse/servlet")
class EventController {
private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
private val emitters: MutableMap<String, SseEmitter> = mutableMapOf()
private val objectMapper: ObjectMapper = ObjectMapper()
private val log = KotlinLogging.logger {}
@GetMapping("/notifications")
fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
if (emitters[eventId] != null) {
log.info("SSE connection already exists")
return emitters[eventId]
}
val emitter = SseEmitter(TimeUnit.MINUTES.toMillis(10))
emitter.onCompletion {
log.info("SSE connection closed")
emitters.remove(eventId)
}
emitter.onTimeout {
log.info("SSE connection timed out")
emitter.complete()
}
emitter.onError { throwable: Throwable? ->
log.error("Listen SSE exception", throwable)
}
emitters[eventId] = emitter
return emitter
}
@PostMapping("/notifications")
@ResponseStatus(ACCEPTED)
fun fireNotification(
@RequestParam eventId: String,
@RequestBody notification: Notification
) {
val sseEmitter = emitters[eventId]
if (sseEmitter === null) {
log.info("SSE connection does not exist")
return
}
handleEmitter(notification, sseEmitter, eventId)
}
private fun handleEmitter(
notification: Notification,
sseEmitter: SseEmitter,
eventId: String
) = try {
val data = objectMapper.writeValueAsString(notification)
val sseEventBuilder = event().data(data)
sseEmitter.send(sseEventBuilder)
} catch (ioException: IOException) {
log.error("Send SSE exception", ioException)
emitters.remove(eventId)
}
the notification model
data class Notification(val message: String)
my pretty simple application.yaml properties file
server:
servlet:
context-path: /api
my gradle configuration
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "3.0.4"
id("io.spring.dependency-management") version "1.1.0"
kotlin("jvm") version "1.7.22"
kotlin("plugin.spring") version "1.7.22"
}
group = "com.ggardet"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17
repositories {
mavenCentral()
}
dependencies {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-security")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "17"
}
}
and finally the requests used to create/consume the events (using httpie)
# to listen for events I open 2 or 3 terminals and launch the following request
http --stream -a user:user -v GET http://localhost:8080/api/sse/servlet/notifications\?eventId\=1
# to fire a new event I open another terminal instance and launch this single request
http -a user:user -v POST http://localhost:8080/api/sse/servlet/notifications\?eventId\=1 message=test
Note: I have the same issue if I remove the "emitters" map and use a single SseEmitter at the class level to send the events.
Upvotes: 1
Views: 3207
Reputation: 974
I have had some success using Sse-EventBus by ralscha. I wrote up my experience, as of a few months ago, here: https://github.com/ralscha/sse-eventbus/issues/26
My code has changed a bit since then, but essentially it came down to the following which the library implements as an abstraction of what you have done on your own:
sseEventBus.createSseEmitter(String clientId, Long timeout, String... events)
to set up a given client subcription.@GetMapping("/api/beds/{bedId}/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun events(@PathVariable bedId: UUID,
@RequestParam clientId: UUID
): ResponseEntity<SseEmitter> {
val bed = BedRepository.getBed(bedId)
val eventNames = getEventNamesFromBedCells(bed)
return ResponseEntity.ok(sseEventBus.createSseEmitter(
clientId.toString(),
120_000L,
*eventNames.toTypedArray()
)
)
}
private fun getEventNamesFromBedCells(bed: BedAggregate?) =
bed!!.rows.flatMap { row ->
row.cells.flatMap { cell ->
listOf("events-$cell", "plants-$cell")
}
}
sseEventBus.handleEvent(SseEvent event)
to let the bus publish to all your subscribed clients.class BedEventBus(private val sseEventBus: SseEventBus) : IBedEventBus {
override fun publishEvent(command: CellCommand, event: BedEvent) {
val eventInfo = when(event) {
is BedCellPlanted -> "plants-${event.bedCellId}" to plantTypeToIcon(event.plantType)
else -> "events-${event.bedCellId}" to (iconMap[event.javaClass.simpleName] ?: "")
}
return sseEventBus.handleEvent(SseEvent.of(eventInfo.first, eventInfo.second))
}
}
That's basically it, ralscha's library does all the heavy lifting!
In this example, I've created two clients on the CLI, and have one in the browser. I ran one "plant" command, then one "water-one" command, finally "plant-many.sh" which results in the plant emojis flowing into their right locations on the screen.
Note: I'm specifically publishing events as snippets of html / plain text instead of JSON since I'm using this with HTMX sse extension and following the "multiple events" approach detailed here) I use plants-$cellUuid
and events-$cellUuid
to generate system-wide unique events for any given garden bed's individual cells. This lets me have a monitor running in multiple locations to get updates on any given bed.
Upvotes: 1
Reputation: 311
So I didn't realize clients can't share the same SseEmitter.
I had to create one SseEmitter per susbcription to make it works.
To answer the questions:
Not sure if it's the best way to do this but I post my solution here in case someone misunderstood how SseEmitter works, the same way I did:
@RestController
@RequestMapping("/sse/servlet")
class EventController {
private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
private val emitters = ConcurrentHashMap<String, MutableList<SseEmitter>>()
private val objectMapper: ObjectMapper = ObjectMapper()
private val log = KotlinLogging.logger {}
@GetMapping("/notifications", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
val emitter = SseEmitter(Duration.ofMinutes(10).toMillis())
emitter.onCompletion {
log.info("SSE connection closed")
emitters.remove(eventId)
}
emitter.onTimeout {
log.info("SSE connection timed out")
emitters.remove(eventId)
emitter.complete()
}
emitter.onError { throwable: Throwable? ->
log.error("Listen SSE exception", throwable)
emitters.remove(eventId)
}
emitters.computeIfAbsent(eventId) { mutableListOf() }.add(emitter)
return emitter
}
@PostMapping("/notifications")
@ResponseStatus(ACCEPTED)
fun fireNotification(
@RequestParam eventId: String,
@RequestBody notification: Notification
) = emitters[eventId]?.forEach { handleEmitter(notification, it) }
private fun handleEmitter(
notification: Notification,
sseEmitter: SseEmitter,
) = try {
val data = objectMapper.writeValueAsString(notification)
val sseEventBuilder = event().data(data)
sseEmitter.send(sseEventBuilder)
} catch (ioException: IOException) {
log.error("Send SSE exception", ioException)
}
}
Upvotes: 4