Mark Lavin
Mark Lavin

Reputation: 1238

Why doesn't bin/kafka-streams-application-reset.sh resetting my application?

Matthias, thanks for the information about kafka-streams-application-reset.sh, but it does not appear that it has the effect of resetting the state of the world (for a particular Application ID) to the beginning.

Here is the code that defines the guts of my stream operation graph:

private KStream< Long, TsdbObject > rollUpMetricToAllParents( KStream< Long, TsdbObject > input_stream ) {

    KStream< Long, TsdbObject > result_stream = null;

    // Map the input stream into another stream < "timestamp":ancestor_assetId, metric_value >
    // and then group by the key so that groups consist of all the values at a particular
    // time for all assets that have a common ancestor...
    KStream< String, Double > asset_values_keyed_by_parents =
            input_stream.flatMap( new groupByTimeAndAllParentsMapper() );

    asset_values_keyed_by_parents.print( Printed.toSysOut() );

    KGroupedStream< String, Double > asset_values_grouped_by_parents =
        asset_values_keyed_by_parents.groupByKey( Serialized.with( Serdes.String(), Serdes.Double() ) );

    // And sum up all the values in each group (timestamp:ancestor)
    KTable< String, Double > sums_of_groups_by_parents = 
            asset_values_grouped_by_parents.aggregate( new SummerInitializer(), new SummerAggregator(),
                    Materialized.with( Serdes.String(), Serdes.Double())  );

    // Convert the result of the aggregation into a stream of TsdbObjects
    KeyValueMapper< String, Double, KeyValue< Long, TsdbObject > > mapper = 
            new RollupMetricStreamMapper();
    result_stream = sums_of_groups_by_parents.toStream().map( mapper );

    result_stream.print( Printed.toSysout() );

    return result_stream;       
}

In context, this method is called passing the input_stream active_power.

Tail of print operation prior to reset:

...
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=415455.3004360199, tags_to_values={assetId=303967}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=331916.46723365784, tags_to_values={assetId=2}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=329964.34125709534, tags_to_values={assetId=0}]

Perform reset:

> bin/kafka-streams-application-reset.sh --application-id TsdbObjectRollUp --input-topics active_power
Reset-offsets for input topics [active_power]
Following input topics offsets will be reset to (for consumer group TsdbObjectRollUp)
Topic: active_power Partition: 0 Offset: 126
Done.
Deleting all internal/auto-created topics for application TsdbObjectRollUp
Done.

Why is the offset of input topic active_power being reset to 126 instead of 0?

Re-run application

Tail of print operation after reset:

...
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=461617.00048446655, tags_to_values={assetId=303967}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=379054.23027038574, tags_to_values={assetId=2}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=377102.10429382324, tags_to_values={assetId=0}]

Note that the values are greater than for the previous run, before the reset. Why are the results the same? What kind of state is not getting reset?

Upvotes: 0

Views: 1785

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Why is the offset of input topic active_power being reset to 126 instead of 0?

The tool does a "seek-to-beginning" thus, I assume that older data was deleted already and 126 is the smallest available offset in the topic.

About the changed result:

Resetting an application consists of two parts as explained in the docs: https://kafka.apache.org/11/documentation/streams/developer-guide/app-reset-tool.html Did you clean up your local state via KafkaStreams#cleanUp()?

This blog post explains the details: https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

Upvotes: 1

Related Questions