Reputation: 1
We've got a multi-module project which needs multiple routing functions (our case is that we're developing modular-monolithic application and we need to consume one message in multiple modules). The problem is that routing functions creates *-out-0 topics in message broker, but in our case we've got only routing to consumers (not to another functions). Documentation says that
We never create output binding for the
RoutingFunction
, only input. So when you routing toConsumer
, theRoutingFunction
effectively becomes as aConsumer
by not having any output bindings. However, ifRoutingFunction
happen to route to anotherFunction
which produces the output, the output binding for theRoutingFunction
will be create dynamically at which pointRoutingFunction
will act as a regularFunction
with regards to bindings (having both input and output bindings)
(https://github.com/spring-cloud/spring-cloud-stream/commit/b2b95dc7d292578c56ae42b066f437b0f1436e65)
In above commit you can see that there's if statement condition added in line 806 in FunctionConfiguration which checks if functionDefinition has default function name (functionRouter
).
We're using spring-cloud-stream in version 3.2.4
In payload of message we've got custom header which tells what's the type of object in payload (e.g. DocumentCreatedEvent, DocumentFinishedEvent) and consumers:
class DocumentCreatedEventConsumer implements Consumer<DocumentCreatedEvent> {
@Override
void accept(final DocumentCreatedEvent event) {
// do something here...
}
}
class DocumentFinishedEventConsumer implements Consumer<DocumentFinishedEvent> {
@Override
void accept(final DocumentFinishedEvent event) {
// do something here...
}
}
which are registered as beans:
@Bean
Consumer<DocumentCreatedEvent> firstModuleDocumentCreatedEventConsumer() {
return new DocumentCreatedEventConsumer();
}
@Bean
Consumer<DocumentCreatedEvent> secondModuleDocumentCreatedEventConsumer() {
return new DocumentCreatedEventConsumer();
}
@Bean
Consumer<DocumentFinishedEvent> firstModuleDocumentFinishedEventConsumer() {
return new DocumentFinishedEventConsumer();
}
@Bean
Consumer<DocumentFinishedEvent> secondModuleDocumentFinishedEventConsumer() {
return new DocumentFinishedEventConsumer();
}
Our yaml fragment is as below:
spring:
cloud:
function:
autodetect: false
routing.enabled: true
definition: firstModuleRouter;secondModuleRouter
stream:
bindings:
firstModuleRouter-in-0:
destination: output-topic
secondModuleRouter-in-0:
destination: output-topic
Our bean which register routing functions:
@Slf4j
@Configuration
@RequiredArgsConstructor
class RegisterMessageBrokerRouters {
private final ConfigurableListableBeanFactory configurableListableBeanFactory;
private final FunctionCatalog functionCatalog;
private final FunctionProperties functionProperties;
private final BindingServiceProperties bindingServiceProperties;
@PostConstruct
void registerRouters() {
bindingServiceProperties.getBindings().keySet().stream()
.filter(it -> it.contains("Router-in-"))
.map(it -> it.replaceAll("Router-in-\\d+", ""))
.forEach(module -> {
final String beanName = module + "Router";
configurableListableBeanFactory.registerSingleton(beanName, createRoutingFunction(module));
log.info("Registered bean '" + beanName + "' for module: " + module);
});
}
private RoutingFunction createRoutingFunction(final String moduleName) {
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(configurableListableBeanFactory),
new ConsumerMessageRoutingCallback(functionCatalog, moduleName));
}
}
Above code register routing functions: firstModuleRouter and secondModuleRouter
Our ConsumerMessageRoutingCallback class:
@RequiredArgsConstructor
public class ConsumerMessageRoutingCallback implements MessageRoutingCallback {
private static final String CONSUMER_SUFFIX = "Consumer";
private static final String DEFAULT_MESSAGE_ROUTING_HANDLER_NAME = "defaultMessageRoutingHandler";
private final FunctionCatalog functionCatalog;
private final String moduleName;
@Override
public FunctionRoutingResult routingResult(final Message<?> message) {
final Object contentTypeHeader = message.getHeaders().get(MessagingHeaders.PAYLOAD_TYPE);
if (contentTypeHeader != null) {
return new FunctionRoutingResult(route(contentTypeHeader.toString()));
}
return new FunctionRoutingResult(DEFAULT_MESSAGE_ROUTING_HANDLER_NAME);
}
String route(final String qualifiedPayloadType) {
final String expectedBeanName = buildExpectedBeanName(moduleName, qualifiedPayloadType);
return beanExists(expectedBeanName) ? expectedBeanName : DEFAULT_MESSAGE_ROUTING_HANDLER_NAME;
}
private String buildExpectedBeanName(final String moduleName, final String qualifiedPayloadType) {
return replaceFirstLetterToLowerCase(nullable(moduleName) + simpleClassName(qualifiedPayloadType) + CONSUMER_SUFFIX);
}
private String replaceFirstLetterToLowerCase(final String input) {
return input.substring(0, 1).toLowerCase() + input.substring(1);
}
private String nullable(final String value) {
return StringUtils.isNotBlank(value) ? value : "";
}
private String simpleClassName(final String fullClassName) {
return fullClassName.substring(fullClassName.lastIndexOf(".") + 1);
}
private boolean beanExists(final String expectedCallMethod) {
return functionCatalog.lookup(expectedCallMethod) != null;
}
}
In this case spring-cloud-stream create's output binding (and topics in message broker) which are not used:
So the question is: is there any way to disable creating of output bindings from routing functions? We've checked sources of spring-cloud-stream and we didn't find any way to do this.
As a workaround we've found the solution to forward created output bindings to output-topic as defined in yaml:
spring:
cloud:
function:
autodetect: false
routing.enabled: true
definition: firstModuleRouter;secondModuleRouter
stream:
bindings:
firstModuleRouter-in-0:
destination: output-topic
firstModuleRouter-out-0:
destination: output-topic
secondModuleRouter-in-0:
destination: output-topic
secondModuleRouter-out-0:
destination: output-topic
But we believe there should be any other solution to disable creating of additional topics in message broker.
Upvotes: 0
Views: 513
Reputation: 5
The issue has been resolved in Spring Cloud Stream 4.0.1. It was reported at GitHub https://github.com/spring-cloud/spring-cloud-stream/issues/2590.
Upvotes: 0
Reputation: 61
I already have met that problem. We have solved it by taking the rights for creating topics on kafka side, and yes it is bad solution, but still.. ))
Upvotes: 0