Clyde Barrow
Clyde Barrow

Reputation: 2102

Spring Actuator + Kafka Streams - Add kafka stream status to health check endpoint

I have a spring boot app where I use apache kafka-streams. I don't use spring cloud streams. I added actuator health check endpoint. I configured it in the application.yml like that:

management:
  health.db.enabled: false
  endpoints.web:
    base-path:
    path-mapping.health: /

When a runtime exception was thrown and my stream was stopped as logs show but the health check status is UP.

2019-09-17 13:16:31.522 INFO 1 --- [ Thread-5] org.apache.kafka.streams.KafkaStreams : stream-client [lpp-model-stream-7e6e8fea-fcad-4033-92a4-5ede50de6e17] Streams client stopped completely

How to bind kafka stream status to health check endpoint ?

My pom.xml:

  <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>
            <dependency>
                <groupId>data-wizards</groupId>
                <artifactId>lpp-common-avro</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-streams-avro-serde</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
            <dependency>
                <groupId>io.vavr</groupId>
                <artifactId>vavr</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

Link to the code where I create the stream: https://gist.github.com/solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3

Upvotes: 8

Views: 15046

Answers (3)

vyacheslav.kislov
vyacheslav.kislov

Reputation: 83

Answer by Yurii J not working in my case.
kafkaStreamsState is always Running even if I stop kafka and logs full of kafka DisconnectException.
I came to next simple solution (for Kakfa health, not including Stores, etc):

@Configuration
@Slf4j
public class KafkaStreamsHealthIndicator implements HealthIndicator {

    @Autowired
    private MeterRegistry meterRegistry;

    @Override
    public Health health() {
        Search connectionsSearch = meterRegistry.find("kafka.producer.connection.count");
        Double kafkaConnections = Optional.ofNullable(connectionsSearch.functionCounter())
                .map(FunctionCounter::count)
                .orElse(0d);
        if (kafkaConnections > 0) {
            return Health.up().build();
        }
        return Health.down().build();
    }
}

Works like a charm, but you have to register kafka metrics using micrometer (MeterRegistry).
Here how we enabling kafka metrics:
in application.yaml - spring.kafka.streams.auto-startup: false
In java:

@Autowired
private MeterRegistry meterRegistry;
...  
kafkaStreams.start();
Thread.sleep(KAFKA_STREAMS_START_DELAY); // 23000, yes this is strange, but we had an unexpected behavior - if enable kafka metrics without this awaiting, then metrics not registered actually
new KafkaStreamsMetrics(kafkaStreams).bindTo(meterRegistry);

Upvotes: 0

Irshad P I
Irshad P I

Reputation: 2844

The health information is collected from all the beans implementing the HealthIndicator interface configured in your application context.

You can create a custom HealthIndicator which you can use to report the Kafka Streams error.

Create your HealthIndicator singleton bean as below

@Component
public class MyHealthIndicator implements HealthIndicator {
    private Exception caughtException = null;
    // Any other information you want to store.
    @Override
    public Health health() {

        if (caughtException == null) {
            return Health
                .up()
                .withDetail("status", "Kafka Streams Running")
                .build();
        }
        else {
            return Health
                .down()
                .withDetail("status", "Not Available")
                .withDetail("error", caughtException.getMessage())
                .build();
        }
    }
    public void setException(Exception caughtException) {
        this.caughtException = caughtException;
    }
}

Then you can Autowire this bean where you are using your Kafka Streams, and you can set the exception as follows.

public class MyApp {
    @Autowire
    private MyHealthIndicator healthIndicator; // You can also use constructor injection instead. 

    // Rest of the code


    public void init() {
        // Streams initialization code here
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            healthIndicator.setException(new Exception(throwable));
        });
    }

}

I hope this helps, but if it doesn't, please give a Minimal, Verifiable, Reproducible Example which someone can work on

Upvotes: 1

Yurii J
Yurii J

Reputation: 286

KafkaStreams maintain an in-memory State which could be mapped to Actuator's health statuses. State could be one of following:CREATED, ERROR, NOT_RUNNING, PENDING_SHUTDOWN, REBALANCING, RUNNING - they are self-explanatory. See docs for state transitions https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/KafkaStreams.State.html

If you're looking for a complete example, you could take following one and update it according to your needs (e.g. you may not count CREATED as status UP). Make sure you have a bean of type KafkaStreams in application context.

//Note that class name prefix before `HealthIndicator` will be camel-cased
//and used as a health component name, `kafkaStreams` here
@Component
public class KafkaStreamsHealthIndicator implements HealthIndicator {

    //if you have multiple instances, inject as Map<String, KafkaStreams>
    //Spring will map KafkaStreams instances by bean names present in context
    //so you can provide status details for each stream by name
    @Autowired
    private KafkaStreams kafkaStreams; 

    @Override
    public Health health() {
        State kafkaStreamsState = kafkaStreams.state();

        // CREATED, RUNNING or REBALANCING
        if (kafkaStreamsState == State.CREATED || kafkaStreamsState.isRunning()) {
            //set details if you need one
            return Health.up().build();
        }

        // ERROR, NOT_RUNNING, PENDING_SHUTDOWN, 
        return Health.down().withDetail("state", kafkaStreamsState.name()).build();
    }
}

Then the health endpoint will display it like:

{
    "status": "UP",
    "kafkaStreams": {
        "status": "DOWN",
        "details": {  //not included if "UP"
            "state": "NOT_RUNNING"
        }
    }
}

Upvotes: 17

Related Questions