Reputation: 3142
I have a spring integration flow which produces messages that should be kept around waiting for an appropriate consumer to come along and consume them.
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}
The messages in the queue should be delivered as server-sent events via http as shown below.
The PublisherSubscription is just a holder for the Publisher and the IntegrationFlowRegistration, the latter is used to destroy the dynamically created flow when it is no longer needed (note that the incoming message for the GET has no content, which is not handled properly ATM by the Webflux integration, hence a small workaround is necessary to get access to the path variable shoved to the customer
header):
@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}
The above request for server-sent events dynamically registers a flow which subscribes to the queue channel on demand with a selective consumer realized by a filter with throwExceptionOnRejection(true)
. Following the spec for Message Handler chain that should ensure that the message is offered to all consumers until one accepts it.
public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}
This construct works in principle, but with the following issues:
MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed
. What I want is that the message remains in the queue and is repeatedly offered to all subscribers until it is either consumed or expires (a proper selective consumer). How can I do that?
Upvotes: 2
Views: 444
Reputation: 121382
note that the incoming message for the GET has no content, which is not handled properly ATM by the Webflux integration
I don't understand this concern.
The WebFluxInboundEndpoint
works with this algorithm:
if (isReadable(request)) {
...
else {
return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}
Where GET
method really goes to the else
branch. And the payload
of the message to send is a MultiValueMap
. And also we recently fixed with you the problem for the POST
, which is released already as well in version 5.0.5
: https://jira.spring.io/browse/INT-4462
Dispatcher has no subscribers
Can't happen on the QueueChannel
in principle. There is no any dispatcher on there at all. It is just queue and sender offers message to be stored. You are missing something else to share with us. But let's call things with its own names: the messagesPerCustomerQueue
is not a QueueChannel
in your application.
UPDATE
Regarding:
What I want is that the message remains in the queue and is repeatedly offered to all subscribers until it is either consumed or expires (a proper selective consumer)
Only what we see is a PollableJmsChannel
based on the embedded ActiveMQ to honor TTL for messages. As a consumer of this queue you should have a PublishSubscribeChannel
with the setMinSubscribers(1)
to make MessagingTemplate
to throw a MessageDeliveryException
when there is no subscribers yet. This way a JMS transaction will be rolled back and message will return to the queue for the next polling cycle.
The problem with in-memory QueueChannel
that there is no transactional redelivery and message once polled from that queue is going to be lost.
Another option is similar to JMS (transactional) is a JdbcChannelMessageStore
for the QueueChannel
. Although this way we don't have a TTL functionality...
Upvotes: 1