Reputation: 579
My spring-integration app needs to be able to switch on demand between Kafka and a legacy messaging library (tibco rendezvous, for which spring-integration doesn't provide any default outbound gateway implementation) through a simple config change.
The legacy messaging library provides some basic request/reply method
Class LegacyTransport {
Object request(Object query,String topic);
}
I'm trying to find out what is the best way to abstract both messaging outbound gateways (Kafka and legacy) so that I can swap one for the other inside my main IntegrationFlow (through a simple config change).
My current idea is to use below method as part of my main IntegrationFlow:
IntegrationFlowDefinition.gateway(IntegrationFlow flow)
First create 2 conditional subflow factory beans with the same name wrapping each of my messaging gateways:
@ConditionalOnProperty(name="messaging",havingValue=[TIBCO/KAFKA])
@Bean
Function<String,IntegrationFlow> gatewaySubflowFactory() {
return (String topic) -> ((IntegrationFlowDefinition<?> f) ->
f.handle(
[messaging library specific implementation here]
));
}
Then use that bean in my main IntegrationFlow:
@Bean
public IntegrationFlow mainFlow(Function<String,IntegrationFlow> gatewaySubflowFactory)
return IntegrationFlows.from(INPUT_CHANNEL)
...
[do some useful processing here]
...
.gateway(gatewaySubflowFactory.apply("some_topic"))
...
[do more useful stuff with gateway output here]
...
.get()
Is there any better (simpler?) way ?
Thanks a lot in advance for your expertise, your ideas and your time.
Best Regards
Upvotes: 0
Views: 216
Reputation: 121560
Any outbound gateway is just a particular implementation for more general service activator pattern. So, your LegacyTransport.request()
could be wrapped into a service activator configuration. That's first.
Second: don't forget. Never forget. One of the first class citizen in Spring Integration is a MessageChannel
abstraction: regular service activator, particular outbound gateway for Kafka - doesn't matter the main point to interact with them is message channel configured for the input of endpoint.
So, your Kafka and Tibco flows could both start from the same channel. Your main flow just sends to that channel its output. See IntegrationFlowDefinition.channel()
for more info.
Both of those particular flows definitely can be marked with the @ConditionalOnProperty
to not let both of them be present at runtime.
Summarize my reasoning, here is some config scratch:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(INPUT_CHANNEL)
...
[do some useful processing here]
...
.gateway(OUTBOUND_GATEWAY_CHANNEL)
...
[do more useful stuff with gateway output here]
...
.get()
}
@ConditionalOnProperty(name="messaging",havingValue=KAFKA)
@Bean
public IntegrationFlow kafkaFlow() {
return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL)
.handle(Kafka.outboundGateway())
.get();
}
@ConditionalOnProperty(name="messaging",havingValue=TIBCO)
@Bean
public IntegrationFlow tibcoFlow() {
return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL)
.handle(legacyTransport, "request")
.get();
}
Upvotes: 2