Reputation: 1238
Matthias, thanks for the information about kafka-streams-application-reset.sh
, but
it does not appear that it has the effect of resetting the state of the world (for
a particular Application ID) to the beginning.
Here is the code that defines the guts of my stream operation graph:
private KStream< Long, TsdbObject > rollUpMetricToAllParents( KStream< Long, TsdbObject > input_stream ) {
KStream< Long, TsdbObject > result_stream = null;
// Map the input stream into another stream < "timestamp":ancestor_assetId, metric_value >
// and then group by the key so that groups consist of all the values at a particular
// time for all assets that have a common ancestor...
KStream< String, Double > asset_values_keyed_by_parents =
input_stream.flatMap( new groupByTimeAndAllParentsMapper() );
asset_values_keyed_by_parents.print( Printed.toSysOut() );
KGroupedStream< String, Double > asset_values_grouped_by_parents =
asset_values_keyed_by_parents.groupByKey( Serialized.with( Serdes.String(), Serdes.Double() ) );
// And sum up all the values in each group (timestamp:ancestor)
KTable< String, Double > sums_of_groups_by_parents =
asset_values_grouped_by_parents.aggregate( new SummerInitializer(), new SummerAggregator(),
Materialized.with( Serdes.String(), Serdes.Double()) );
// Convert the result of the aggregation into a stream of TsdbObjects
KeyValueMapper< String, Double, KeyValue< Long, TsdbObject > > mapper =
new RollupMetricStreamMapper();
result_stream = sums_of_groups_by_parents.toStream().map( mapper );
result_stream.print( Printed.toSysout() );
return result_stream;
}
In context, this method is called passing the input_stream active_power
.
Tail of print operation prior to reset:
...
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=415455.3004360199, tags_to_values={assetId=303967}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=331916.46723365784, tags_to_values={assetId=2}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=329964.34125709534, tags_to_values={assetId=0}]
Perform reset:
> bin/kafka-streams-application-reset.sh --application-id TsdbObjectRollUp --input-topics active_power
Reset-offsets for input topics [active_power]
Following input topics offsets will be reset to (for consumer group TsdbObjectRollUp)
Topic: active_power Partition: 0 Offset: 126
Done.
Deleting all internal/auto-created topics for application TsdbObjectRollUp
Done.
Why is the offset of input topic active_power being reset to 126 instead of 0?
Re-run application
Tail of print operation after reset:
...
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=461617.00048446655, tags_to_values={assetId=303967}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=379054.23027038574, tags_to_values={assetId=2}]
[KSTREAM-MAP-0000000010]: 1517461200000, TsdbObject [timestamp_ms=1517461200000, value=377102.10429382324, tags_to_values={assetId=0}]
Note that the values are greater than for the previous run, before the reset. Why are the results the same? What kind of state is not getting reset?
Upvotes: 0
Views: 1785
Reputation: 62350
Why is the offset of input topic active_power being reset to 126 instead of 0?
The tool does a "seek-to-beginning" thus, I assume that older data was deleted already and 126 is the smallest available offset in the topic.
About the changed result:
Resetting an application consists of two parts as explained in the docs: https://kafka.apache.org/11/documentation/streams/developer-guide/app-reset-tool.html Did you clean up your local state via KafkaStreams#cleanUp()
?
This blog post explains the details: https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
Upvotes: 1