Haider
Haider

Reputation: 615

Unable to send notification to all connection through SSE

I am trying to notify all the users of an event through server sent event but don't know why but its not working. I am able to notify single user if i use queue but if i change the configuration to use topic it does nothing. A you see below i am use queue and comment out template.setPubSubDomain(true) in jmsTopicTemplate then it is sending to single user

@Bean
    public Publisher<Message<LocationData>> jmsReactiveSource(ConnectionFactory connectionFactory) {

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination(JMSConfig.Location_TOPIC).
                        jmsMessageConverter(messageConverter()))
                //.channel(new PublishSubscribeChannel(executor()))                 
//              .channel(MessageChannels.flux())
                .channel(MessageChannels.queue())
                .toReactivePublisher();
    }

You can check the code at https://github.com/haiderali22/spring-tracking-jms-sse-mongo-app

Upvotes: 0

Views: 118

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121550

Well, I have checked your project unfortunately it is still big enough to digest properly. But what I see from Integration perspective should be like this:

return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(JMSConfig.Location_TOPIC).
                            jmsMessageConverter(messageConverter())
                    .autoStartup(false)
                    .id("jmsMessageDrivenChannelAdapter"))
            .toReactivePublisher();

The toReactivePublisher() injects the channel by itself already. No need in any other in between. The autoStartup(false) is for deferred subscription into the final Flux. This way you won't pull messages from JMS until a subscription happens to the Flux in the end.

The .id("jmsMessageDrivenChannelAdapter")) you will use later in the LocationService for this:

public Flux<LocationData> watch() {
    return Flux.from(jmsReactiveSource)
            .map(Message::getPayload)
            .doOnSubscribe(s -> jmsMessageDrivenChannelAdapter.start());
}

This way you won't start pulling from JMS until a real subscription happens into a Flux.

The JMS topic has no relevance to this SSE subject.

If you can make your project more simpler, I would try it again. I'm not familiar with Lombok though...

UPDATE

With the current solution you need to make it like this:

.channel(MessageChannels.flux())
.toReactivePublisher();

The problem that with regular .toReactivePublisher() just after that message-driven we get only one-subscriber for the final publisher.

to make it pub-sub you definitely need to place a FluxMessageChannel in between. This way all your SEE are going to be sent to all the JavaScript subscribers.

Upvotes: 1

Related Questions