MaatDeamon
MaatDeamon

Reputation: 9771

Resetting a Kafka Stream Application

trying to understand how to use the reset tool for kafka stream application. However i am confused by the explanation.

It is said that there are 2 stage to it, local rest (cleanup), followed by the global reset which is using the reset-tool.

The following example is provided in confluent:

https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html#step-2-reset-the-local-environments-of-your-application-instances


    package io.confluent.examples.streams;

import ...;

public class ResetDemo {

  public static void main(String[] args) throws Exception {
    // Kafka Streams configuration
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
    // ...and so on...

    // Define the processing topology
    StreamsBuilder builder = new StreamsBuilder();
    builder.stream("my-input-topic")
        .selectKey(...)
        .through("rekeyed-topic")
        .groupByKey()
        .count("global-count")
        .to("my-output-topic");

    KafkaStreams app = new KafkaStreams(builder.build(), props);

    // Delete the application's local state.
    // Note: In real application you'd call `cleanUp()` only under
    // certain conditions.  See tip on `cleanUp()` below.
    app.cleanUp();

    app.start();

    // Note: In real applications you would register a shutdown hook
    // that would trigger the call to `app.close()` rather than
    // using the sleep-then-close example we show here.
    Thread.sleep(30 * 1000L);
    app.close();
  }

}

followed by the following example:

Tip: To avoid the corresponding recovery overhead, you should not call cleanUp() unconditionally and every time an application instance is restarted or resumed. For exammple, in a production application you could use command line arguments to enable or disable the cleanUp() call on an as-needed basis.

You can then perform run-reset-modify cycles as follows:

    # Run your application
  bin/kafka-run-class io.confluent.examples.streams.ResetDemo

# After stopping all application instances, reset the application
  bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic

# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.

I do not understand it.

  1. Does app.cleanUp () require app.start() to be executed ?
  2. Why having a app.cleanUp () followed by an app.start(), in what circumstances would that be useful ?
  3. Why not just having a flag that say if it is cleanUp then just run app.cleanUp(), and it is not run app.start();

I simply do not understand the last part of the example

run your app then apply the reset tool. but if the app look like in the example, then after the cleanUp, the app will run and we will have local data. So stopping will not help the rest-tool.

It is very confusing. Can someone who understand it explain please ?

Upvotes: 2

Views: 4046

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20860

Running the application reset tool, ensures that your application’s state – as tracked globally in the application’s configured Kafka cluster – is reset. However, by design the reset tool does not modify or reset the local environment of your application instances, which includes the application’s local state directory. For a complete application reset you must also delete the application’s local state directory on any machines on which an application instance was run prior to restarting an application instance on the same machines. You can either use the API method KafkaStreams#cleanUp() in your application code or manually delete the corresponding local state directory (state.dir configuration parameter).

app.cleanup() method deletes the local statestores. app.startUp() starts the KStream application.

  1. Does app.cleanUp () require app.start() to be executed ?

app.cleanup() method deletes the local statestores. app.startUp() is required to start the KStream application. If you want to just cleanup the stores, you can call only cleanUp() only. But if you want to start the app, you need to call startUp() method too.

  1. Why having a app.cleanUp () followed by an app.start(), in what circumstances would that be useful ?

When you want to reset the application to the earliest offset and run it from the beginning, you need to call cleanUp() method. If you want to resume the stopped application from the current state, don't call cleanUp() in your application code. It is not recommended to use default cleanup as a part of appplication unless needed. Also you can't delete the stores in a running application so always call cleanUp() before startUp() method.

  1. Why not just having a flag that say if it is cleanUp then just run app.cleanUp(), and it is not run app.start();

Yes, you can do that based on your use case. Define a flag in configuration file, and add a conditional cleanup. Example :

if(cleanUpFlag){
   app.cleanUp();
}
app.start();

If you want to automate the entire process for global and local cleanup, you can write a script which takes a configuration file as input. You can update the flag value flag=true in that configuration file as true. Also make sure that the config variable is used in if condition.

if(properties.getProperty("cleanUp.flag") == true){
       app.cleanUp();
    }

Example :

# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset --application-id my-streams-app \
                                      --input-topics my-input-topic \
                                      --intermediate-topics rekeyed-topic \
                                       --bootstrap-servers brokerHost:9092 \
                                      --zookeeper zookeeperHost:2181


# Run your application
$ java –DApp.config.file=app.properties –jar KStreamDemo.jar

Upvotes: 1

Related Questions