Reputation: 615
I am trying to notify all the users of an event through server sent event but don't know why but its not working. I am able to notify single user if i use queue but if i change the configuration to use topic it does nothing. A you see below i am use queue and comment out template.setPubSubDomain(true) in jmsTopicTemplate then it is sending to single user
@Bean
public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter()))
//.channel(new PublishSubscribeChannel(executor()))
// .channel(MessageChannels.flux())
.channel(MessageChannels.queue())
.toReactivePublisher();
}
You can check the code at https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app
Upvotes: 0
Views: 118
Reputation: 121550
Well, I have checked your project unfortunately it is still big enough to digest properly. But what I see from Integration perspective should be like this:
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(JMSConfig.Location_TOPIC).
jmsMessageConverter(messageConverter())
.autoStartup(false)
.id("jmsMessageDrivenChannelAdapter"))
.toReactivePublisher();
The toReactivePublisher()
injects the channel by itself already. No need in any other in between. The autoStartup(false)
is for deferred subscription into the final Flux
. This way you won't pull messages from JMS until a subscription happens to the Flux
in the end.
The .id("jmsMessageDrivenChannelAdapter"))
you will use later in the LocationService
for this:
public Flux<LocationData> watch() {
return Flux.from(jmsReactiveSource)
.map(Message::getPayload)
.doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}
This way you won't start pulling from JMS until a real subscription happens into a Flux
.
The JMS topic has no relevance to this SSE subject.
If you can make your project more simpler, I would try it again. I'm not familiar with Lombok though...
UPDATE
With the current solution you need to make it like this:
.channel(MessageChannels.flux())
.toReactivePublisher();
The problem that with regular .toReactivePublisher()
just after that message-driven we get only one-subscriber for the final publisher.
to make it pub-sub you definitely need to place a FluxMessageChannel
in between. This way all your SEE are going to be sent to all the JavaScript subscribers.
Upvotes: 1