Tomboyo
Tomboyo

Reputation: 181

Spring Cloud Stream with Kafka Binder: /bindings Actuator API does not stop producer

I have a Spring Cloud Stream project with Actuator and the Kafka binder. I am exploring the bindings/ actuator and am trying to stop a producer as an exercise. I make the following POST request via curl:

curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'

Actual Results: The query returns 204. The state of the producer (seen from GET /actuator/bindings/producer-out-0) is now stopped. The producer is still producing messages, however, which can be seen from both logging and consumer activity on the topic.

Expected Results: I expected the producer to stop producing messages. (I have also tried using the PAUSED state, which also returns 204, but error logs indicate that this producer cannot be paused.)

Do I misunderstand how this actuator works? When a producer is stopped, is it expected that S.C.S. will continue to poll that producer? The only documentation I am aware of is here, but it doesn't answer my questions as far as I can tell.

Background:

I am using spring-boot-starter-parent 2.5.3 and have starter-web and starter-actuator listed as dependencies. I don't think I'm missing any.

This is the producer/consumer pair. As you can see I am using a pollable supplier.

@Configuration
@Profile("numbers")
public class NumberHandlers {
  private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);

  @Bean
  public Supplier<Integer> producer() {
    // Needed an effectively-final mutable integer. Side-bar comments welcome. :P
    var counter = new AtomicInteger();
    return () -> {
      var n = counter.getAndIncrement();
      LOGGER.info("Producing number: " + n);
      return n;
    };
  }

  @Bean
  public Consumer<Integer> consumer() {
    return it -> LOGGER.info("Consuming number: " + it);
  }
}

These are active when I pass in the numbers profile. My configurations are below.

application.yml:

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${env.kafka.bootstrapservers:localhost}
management:
  endpoints:
    web:
      exposure:
        include: 'bindings'

... and application-numbers.yml:

spring:
  cloud:
    stream:
      poller:
        fixedDelay: 5000
      bindings:
        producer-out-0:
          destination: numbers-raw
          producer:
            partitionCount: 3
        consumer-in-0:
          destination: numbers-raw
      kafka:
        bindings:
          producer-out-0:
            producer:
              topic.properties:
                # These look weird because they're done as an exercise.
                retention.bytes: 10000
                retention.ms: 172800000
    function:
      definition: producer;consumer

I am testing in a localhost environment using a docker-compose kafka and zookeeper on the host network.

Thanks!

Upvotes: 0

Views: 911

Answers (1)

Gary Russell
Gary Russell

Reputation: 174749

Lifecycle control of producer bindings is not currently supported, only consumer bindings.

Upvotes: 1

Related Questions