Reputation: 79
I have implemented Server sent events by using Project Reactor for setting up one-way communication from server to client.
On the server, I add a flux connection of type String:
Flux<String> addDataConnection() {
return Flux.create(emitter -> {
fluxSinks.add(emitter);
});
}
which in turn propagates messages whenever I receive them:
void propagateMessage(String messageContent) {
fluxSinks.parallelStream()
.forEach(fluxSink -> fluxSink.next(messageContent));
}
This data comes from an external source, and so the frequency of incoming events is outside of my control.
Therefore I have added an extra heartbeat stream which I merge with my data connection stream. This way, I am able to both keep my connection alive, and also detect terminated connections e.g. when browser window is closed:
Flux.interval(Duration duration)
actually returns a Long, but I map it to a String to match the type safety of the data connection. This string is not actually used by anyone
Flux<String> addHeartbeat() {
return Flux.interval(Duration.ofSeconds(2))
.map(i -> "heartbeat");
}
My question is about the element type of the flux. As I am now returning strings on my data connections, I can simply return a string from the heartbeat, and therefore return a Flux<String>
from the merging of the fluxes:
Flux<String> initiateFluxStreams() {
return Flux.merge(addDataConnection(), addHeartbeat());
}
But what if my data-stream returns a more complex object and I want to ensure type safety in my flux merger method?
I can always return a type Flux<?>
to make my compiler happy, but I am not really happy with this solution as it allows me (or others) to return anything they want.
Does merging of fluxes support a more detailed compile safety?
EDIT: I have seen the answer at https://stackoverflow.com/a/49082329/3432964, and how it uses <? extends someMarkerInterface>
for the type. Is this my only option? And is it any better to wrap the whole thing in a ServerSentEvent
-object?
Upvotes: 1
Views: 681
Reputation: 72399
There's no "magic" way to do this with a merged Flux
of different types, as you have no guarantee that the types will have any common ancestor (aside from Object, but as you point out, that's not too useful.)
If your type is Foo
, you could have a "special" instance (or subclass) of Foo
just denoted for an empty heartbeat type, but I don't particularly like that approach - it involves potentially making design sacrifices just to crowbar a certain type in there.
My preferred approach (caution, haven't seen this elsewhere, so I can't claim this as a "common approach") is to use a Flux<Optional<Foo>>
, where the "heartbeat" triggers are just empty optionals. Something like:
Flux<Foo> addDataConnection() {
return Flux.create(emitter -> {
fluxSinks.add(emitter);
});
}
Flux<Optional<Foo>> initiateFluxStreams() {
return Flux.merge(addDataConnection().map(f -> Optional.of(f)), addHeartbeat().map(h -> Optional.empty()));
}
Flux<Long> addHeartbeat() {
return Flux.interval(Duration.ofSeconds(2));
}
You can then just filter & map the resulting Flux
to weed out the heartbeats before processing:
fluxOfOptionalFoo
.filter(Optional::isPresent)
.map(Optional::get);
I prefer this approach for a number of reasons:
Flux
of a type that "may or may not be there"If you want to, you could of course implement your own "heartbeat" generic class if you don't think optional is "descriptive" enough, but personally I wouldn't say that's necessary.
Upvotes: 1
Reputation: 14819
Like many protocols (rest api, etc) i would implement a class containing metadata and a body payload.
// This is just some type of pseudo code
{
private type: HEARTBEAT
}
{
private type: DATA
private payload: {
// Some type of payload
}
}
So defining a class that can handle multiple types of data by using generics.
new Request<Foo>();
Upvotes: 0