bgorecki
bgorecki

Reputation: 1

Spring cloud streams - multiple routing functions in multimodule project creates output bindings

We've got a multi-module project which needs multiple routing functions (our case is that we're developing modular-monolithic application and we need to consume one message in multiple modules). The problem is that routing functions creates *-out-0 topics in message broker, but in our case we've got only routing to consumers (not to another functions). Documentation says that

We never create output binding for the RoutingFunction, only input. So when you routing to Consumer, the RoutingFunction effectively becomes as a Consumer by not having any output bindings. However, if RoutingFunction happen to route to another Function which produces the output, the output binding for the RoutingFunction will be create dynamically at which point RoutingFunction will act as a regular Function with regards to bindings (having both input and output bindings)

(https://github.com/spring-cloud/spring-cloud-stream/commit/b2b95dc7d292578c56ae42b066f437b0f1436e65)

In above commit you can see that there's if statement condition added in line 806 in FunctionConfiguration which checks if functionDefinition has default function name (functionRouter).

We're using spring-cloud-stream in version 3.2.4

In payload of message we've got custom header which tells what's the type of object in payload (e.g. DocumentCreatedEvent, DocumentFinishedEvent) and consumers:

class DocumentCreatedEventConsumer implements Consumer<DocumentCreatedEvent> {
    @Override
    void accept(final DocumentCreatedEvent event) {
        // do something here...
    }
}

class DocumentFinishedEventConsumer implements Consumer<DocumentFinishedEvent> {
    @Override
    void accept(final DocumentFinishedEvent event) {
        // do something here...
    }
}

which are registered as beans:

@Bean
Consumer<DocumentCreatedEvent> firstModuleDocumentCreatedEventConsumer() {
    return new DocumentCreatedEventConsumer();
}

@Bean
Consumer<DocumentCreatedEvent> secondModuleDocumentCreatedEventConsumer() {
    return new DocumentCreatedEventConsumer();
}

@Bean
Consumer<DocumentFinishedEvent> firstModuleDocumentFinishedEventConsumer() {
    return new DocumentFinishedEventConsumer();
}

@Bean
Consumer<DocumentFinishedEvent> secondModuleDocumentFinishedEventConsumer() {
    return new DocumentFinishedEventConsumer();
}

Our yaml fragment is as below:

spring:
  cloud:
    function:
      autodetect: false
      routing.enabled: true
      definition: firstModuleRouter;secondModuleRouter
    stream:
      bindings:
        firstModuleRouter-in-0:
          destination: output-topic
        secondModuleRouter-in-0:
          destination: output-topic

Our bean which register routing functions:

@Slf4j
@Configuration
@RequiredArgsConstructor
class RegisterMessageBrokerRouters {
    private final ConfigurableListableBeanFactory configurableListableBeanFactory;
    private final FunctionCatalog functionCatalog;
    private final FunctionProperties functionProperties;
    private final BindingServiceProperties bindingServiceProperties;

    @PostConstruct
    void registerRouters() {
        bindingServiceProperties.getBindings().keySet().stream()
            .filter(it -> it.contains("Router-in-"))
            .map(it -> it.replaceAll("Router-in-\\d+", ""))
            .forEach(module -> {
                final String beanName = module + "Router";
                configurableListableBeanFactory.registerSingleton(beanName, createRoutingFunction(module));
                log.info("Registered bean '" + beanName + "' for module: " + module);
            });
    }

    private RoutingFunction createRoutingFunction(final String moduleName) {
        return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(configurableListableBeanFactory),
            new ConsumerMessageRoutingCallback(functionCatalog, moduleName));
    }
}

Above code register routing functions: firstModuleRouter and secondModuleRouter

Our ConsumerMessageRoutingCallback class:

@RequiredArgsConstructor
public class ConsumerMessageRoutingCallback implements MessageRoutingCallback {
    private static final String CONSUMER_SUFFIX = "Consumer";
    private static final String DEFAULT_MESSAGE_ROUTING_HANDLER_NAME = "defaultMessageRoutingHandler";
    private final FunctionCatalog functionCatalog;
    private final String moduleName;

    @Override
    public FunctionRoutingResult routingResult(final Message<?> message) {
        final Object contentTypeHeader = message.getHeaders().get(MessagingHeaders.PAYLOAD_TYPE);
        if (contentTypeHeader != null) {
            return new FunctionRoutingResult(route(contentTypeHeader.toString()));
        }
        return new FunctionRoutingResult(DEFAULT_MESSAGE_ROUTING_HANDLER_NAME);
    }

    String route(final String qualifiedPayloadType) {
        final String expectedBeanName = buildExpectedBeanName(moduleName, qualifiedPayloadType);
        return beanExists(expectedBeanName) ? expectedBeanName : DEFAULT_MESSAGE_ROUTING_HANDLER_NAME;
    }

    private String buildExpectedBeanName(final String moduleName, final String qualifiedPayloadType) {
        return replaceFirstLetterToLowerCase(nullable(moduleName) + simpleClassName(qualifiedPayloadType) + CONSUMER_SUFFIX);
    }

    private String replaceFirstLetterToLowerCase(final String input) {
        return input.substring(0, 1).toLowerCase() + input.substring(1);
    }

    private String nullable(final String value) {
        return StringUtils.isNotBlank(value) ? value : "";
    }

    private String simpleClassName(final String fullClassName) {
        return fullClassName.substring(fullClassName.lastIndexOf(".") + 1);
    }

    private boolean beanExists(final String expectedCallMethod) {
        return functionCatalog.lookup(expectedCallMethod) != null;
    }
}

In this case spring-cloud-stream create's output binding (and topics in message broker) which are not used:

So the question is: is there any way to disable creating of output bindings from routing functions? We've checked sources of spring-cloud-stream and we didn't find any way to do this.

As a workaround we've found the solution to forward created output bindings to output-topic as defined in yaml:

spring:
  cloud:
    function:
      autodetect: false
      routing.enabled: true
      definition: firstModuleRouter;secondModuleRouter
    stream:
      bindings:
        firstModuleRouter-in-0:
          destination: output-topic
        firstModuleRouter-out-0:
          destination: output-topic
        secondModuleRouter-in-0:
          destination: output-topic
        secondModuleRouter-out-0:
          destination: output-topic

But we believe there should be any other solution to disable creating of additional topics in message broker.

Upvotes: 0

Views: 513

Answers (2)

rafallezanko
rafallezanko

Reputation: 5

The issue has been resolved in Spring Cloud Stream 4.0.1. It was reported at GitHub https://github.com/spring-cloud/spring-cloud-stream/issues/2590.

Upvotes: 0

Axiomatic-1
Axiomatic-1

Reputation: 61

I already have met that problem. We have solved it by taking the rights for creating topics on kafka side, and yes it is bad solution, but still.. ))

Upvotes: 0

Related Questions