Reputation: 9771
trying to understand how to use the reset tool for kafka stream application. However i am confused by the explanation.
It is said that there are 2 stage to it, local rest (cleanup), followed by the global reset which is using the reset-tool.
The following example is provided in confluent:
https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html#step-2-reset-the-local-environments-of-your-application-instances
package io.confluent.examples.streams;
import ...;
public class ResetDemo {
public static void main(String[] args) throws Exception {
// Kafka Streams configuration
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
// ...and so on...
// Define the processing topology
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-input-topic")
.selectKey(...)
.through("rekeyed-topic")
.groupByKey()
.count("global-count")
.to("my-output-topic");
KafkaStreams app = new KafkaStreams(builder.build(), props);
// Delete the application's local state.
// Note: In real application you'd call `cleanUp()` only under
// certain conditions. See tip on `cleanUp()` below.
app.cleanUp();
app.start();
// Note: In real applications you would register a shutdown hook
// that would trigger the call to `app.close()` rather than
// using the sleep-then-close example we show here.
Thread.sleep(30 * 1000L);
app.close();
}
}
followed by the following example:
Tip: To avoid the corresponding recovery overhead, you should not call cleanUp() unconditionally and every time an application instance is restarted or resumed. For exammple, in a production application you could use command line arguments to enable or disable the cleanUp() call on an as-needed basis.
You can then perform run-reset-modify cycles as follows:
# Run your application
bin/kafka-run-class io.confluent.examples.streams.ResetDemo
# After stopping all application instances, reset the application
bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic
# Now you can modify/recompile as needed and then re-run the application again.
# You can also experiment, for example, with different input data without
# modifying the application.
I do not understand it.
I simply do not understand the last part of the example
run your app then apply the reset tool. but if the app look like in the example, then after the cleanUp, the app will run and we will have local data. So stopping will not help the rest-tool.
It is very confusing. Can someone who understand it explain please ?
Upvotes: 2
Views: 4046
Reputation: 20860
Running the application reset tool, ensures that your application’s state – as tracked globally in the application’s configured Kafka cluster – is reset. However, by design the reset tool does not modify or reset the local environment of your application instances, which includes the application’s local state directory.
For a complete application reset you must also delete the application’s local state directory on any machines on which an application instance was run prior to restarting an application instance on the same machines. You can either use the API method KafkaStreams#cleanUp() in your application code or manually delete the corresponding local state directory (state.dir
configuration parameter).
app.cleanup()
method deletes the local statestores. app.startUp()
starts the KStream application.
- Does app.cleanUp () require app.start() to be executed ?
app.cleanup()
method deletes the local statestores. app.startUp()
is required to start the KStream application. If you want to just cleanup the stores, you can call only cleanUp()
only. But if you want to start the app, you need to call startUp()
method too.
- Why having a app.cleanUp () followed by an app.start(), in what circumstances would that be useful ?
When you want to reset the application to the earliest offset and run it from the beginning, you need to call cleanUp()
method. If you want to resume the stopped application from the current state, don't call cleanUp() in your application code.
It is not recommended to use default cleanup as a part of appplication unless needed. Also you can't delete the stores in a running application so always call cleanUp()
before startUp()
method.
- Why not just having a flag that say if it is cleanUp then just run app.cleanUp(), and it is not run app.start();
Yes, you can do that based on your use case. Define a flag in configuration file, and add a conditional cleanup. Example :
if(cleanUpFlag){
app.cleanUp();
}
app.start();
If you want to automate the entire process for global and local cleanup, you can write a script which takes a configuration file as input. You can update the flag value flag=true
in that configuration file as true. Also make sure that the config variable is used in if condition.
if(properties.getProperty("cleanUp.flag") == true){
app.cleanUp();
}
Example :
# After stopping all application instances, reset the application
$ bin/kafka-streams-application-reset --application-id my-streams-app \
--input-topics my-input-topic \
--intermediate-topics rekeyed-topic \
--bootstrap-servers brokerHost:9092 \
--zookeeper zookeeperHost:2181
# Run your application
$ java –DApp.config.file=app.properties –jar KStreamDemo.jar
Upvotes: 1